diff --git a/cmd/ethereum/broadcast.go b/cmd/ethereum/broadcast.go index 236f0bd..391d078 100644 --- a/cmd/ethereum/broadcast.go +++ b/cmd/ethereum/broadcast.go @@ -22,6 +22,7 @@ import ( // Broadcast broadcasts a message to Ethereum func Broadcast( + ctx context.Context, cfg config.Config, logger log.Logger, msg *types.MessageState, @@ -30,6 +31,9 @@ func Broadcast( // set up eth client client, err := ethclient.Dial(cfg.Networks.Destination.Ethereum.RPC) + if err != nil { + return nil, fmt.Errorf("unable to dial ethereum client: %w", err) + } defer client.Close() privEcdsaKey, ethereumAddress, err := GetEcdsaKeyAddress(cfg.Networks.Minters[0].MinterPrivateKey) @@ -38,7 +42,15 @@ func Broadcast( } auth, err := bind.NewKeyedTransactorWithChainID(privEcdsaKey, big.NewInt(cfg.Networks.Destination.Ethereum.ChainId)) + if err != nil { + return nil, fmt.Errorf("unable to create auth: %w", err) + } + messageTransmitter, err := NewMessageTransmitter(common.HexToAddress(cfg.Networks.Source.Ethereum.MessageTransmitter), client) + if err != nil { + return nil, fmt.Errorf("unable to create message transmitter: %w", err) + } + attestationBytes, err := hex.DecodeString(msg.Attestation[2:]) if err != nil { return nil, errors.New("unable to decode message attestation") @@ -58,8 +70,9 @@ func Broadcast( // check if nonce already used co := &bind.CallOpts{ Pending: true, - Context: context.Background(), + Context: ctx, } + key := append( common.LeftPadBytes((big.NewInt(int64(msg.SourceDomain))).Bytes(), 4), common.LeftPadBytes((big.NewInt(int64(msg.Nonce))).Bytes(), 8)..., @@ -69,6 +82,7 @@ func Broadcast( if nonceErr != nil { logger.Debug("Error querying whether nonce was used. Continuing...") } else { + fmt.Printf("received used nonce response: %d\n", response) if response.Uint64() == uint64(1) { // nonce has already been used, mark as complete logger.Debug(fmt.Sprintf("This source domain/nonce has already been used: %d %d", diff --git a/cmd/ethereum/broadcast_test.go b/cmd/ethereum/broadcast_test.go new file mode 100644 index 0000000..ae517e8 --- /dev/null +++ b/cmd/ethereum/broadcast_test.go @@ -0,0 +1,22 @@ +package ethereum_test + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" +) + +func TestEthUsedNonce(t *testing.T) { + sourceDomain := uint32(4) + nonce := uint64(5) + + key := append( + common.LeftPadBytes((big.NewInt(int64(sourceDomain))).Bytes(), 4), + common.LeftPadBytes((big.NewInt(int64(nonce))).Bytes(), 8)..., + ) + + require.Equal(t, []byte("\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x05"), key) + +} diff --git a/cmd/noble/broadcast.go b/cmd/noble/broadcast.go index 88c86c2..4848489 100644 --- a/cmd/noble/broadcast.go +++ b/cmd/noble/broadcast.go @@ -39,22 +39,6 @@ func Broadcast( msg *types.MessageState, sequenceMap *types.SequenceMap, ) (*ctypes.ResultBroadcastTx, error) { - cc, err := cosmos.NewProvider(cfg.Networks.Source.Noble.RPC) - if err != nil { - return nil, fmt.Errorf("unable to build cosmos provider for noble: %w", err) - } - - used, err := cc.QueryUsedNonce(ctx, msg.SourceDomain, msg.Nonce) - if err != nil { - return nil, fmt.Errorf("unable to query used nonce: %w", err) - } - - if used { - msg.Status = types.Complete - logger.Info(fmt.Sprintf("Noble CCTP minter nonce %d already used", msg.Nonce)) - return nil, nil - } - // set up sdk context interfaceRegistry := codectypes.NewInterfaceRegistry() nobletypes.RegisterInterfaces(interfaceRegistry) @@ -97,7 +81,22 @@ func Broadcast( return nil, errors.New("failed to set up rpc client") } + cc, err := cosmos.NewProvider(cfg.Networks.Source.Noble.RPC) + if err != nil { + return nil, fmt.Errorf("unable to build cosmos provider for noble: %w", err) + } + for attempt := 0; attempt <= cfg.Networks.Destination.Noble.BroadcastRetries; attempt++ { + used, err := cc.QueryUsedNonce(ctx, msg.SourceDomain, msg.Nonce) + if err != nil { + return nil, fmt.Errorf("unable to query used nonce: %w", err) + } + + if used { + msg.Status = types.Complete + return nil, fmt.Errorf("noble cctp minter nonce %d already used", msg.Nonce) + } + logger.Info(fmt.Sprintf( "Broadcasting %s message from %d to %d: with source tx hash %s", msg.Type, diff --git a/cmd/process.go b/cmd/process.go index 355b83e..ad7adc5 100644 --- a/cmd/process.go +++ b/cmd/process.go @@ -141,7 +141,7 @@ func StartProcessor(ctx context.Context, cfg config.Config, logger log.Logger, p if msg.Status == types.Attested { switch msg.DestDomain { case 0: // ethereum - response, err := ethereum.Broadcast(cfg, logger, msg, sequenceMap) + response, err := ethereum.Broadcast(ctx, cfg, logger, msg, sequenceMap) if err != nil { logger.Error("unable to mint on Ethereum", "err", err) processingQueue <- msg @@ -160,10 +160,6 @@ func StartProcessor(ctx context.Context, cfg config.Config, logger log.Logger, p processingQueue <- msg continue } - if response == nil { - // nothing to do - continue - } if response.Code != 0 { logger.Error("nonzero response code received", "err", err) processingQueue <- msg