diff --git a/README.md b/README.md index 94bae60..d4a1f7d 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,14 @@ Consider a 30 minute flush interval (1800 seconds) - Polygon: 2 second blocks = (1800 / 2) = `900 blocks` - Arbitrum: 0.26 second blocks = (1800 / 0.26) = `~6950 blocks` +### Flush Only Mode + +This relayer also supports a `--flush-only-mode`. This mode will only flush the chain and not actively listen for new events as they occur. This is useful for running a secondary relayer which "lags" behind the primary relayer. It is only responsible for retrying failed transactions. + +When the relayer is in flush only mode, the flush mechanism will start at `latest height - (4 * lookback period)` and finish at `latest height - (3 * lookback period)`. For all subsequent flushes, the relayer will start at the last flushed block and finish at `latest height - (3 * lookback period)`. Please see the notes above for configuring the flush interval and lookback period. + +> Note: It is highly recommended to use the same configuration for both the primary and secondary relayer. This ensures that there is zero overlap between the relayers. + ### Prometheus Metrics By default, metrics are exported at on port :2112/metrics (`http://localhost:2112/metrics`). You can customize the port using the `--metrics-port` flag. diff --git a/circle/attestation.go b/circle/attestation.go index 4776798..b866076 100644 --- a/circle/attestation.go +++ b/circle/attestation.go @@ -15,13 +15,6 @@ import ( // CheckAttestation checks the iris api for attestation status and returns true if attestation is complete func CheckAttestation(attestationURL string, logger log.Logger, irisLookupID string, txHash string, sourceDomain, destDomain types.Domain) *types.AttestationResponse { - logger.Debug(fmt.Sprintf("Checking attestation for %s%s%s for source tx %s from %d to %d", attestationURL, "0x", irisLookupID, txHash, sourceDomain, destDomain)) - - client := http.Client{Timeout: 2 * time.Second} - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - // append ending / if not present if attestationURL[len(attestationURL)-1:] != "/" { attestationURL += "/" @@ -32,22 +25,30 @@ func CheckAttestation(attestationURL string, logger log.Logger, irisLookupID str irisLookupID = "0x" + irisLookupID } + logger.Debug(fmt.Sprintf("Checking attestation for %s%s for source tx %s from %d to %d", attestationURL, irisLookupID, txHash, sourceDomain, destDomain)) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, attestationURL+irisLookupID, nil) if err != nil { logger.Debug("error creating request: " + err.Error()) return nil } + client := http.Client{} rawResponse, err := client.Do(req) if err != nil { logger.Debug("error during request: " + err.Error()) return nil } + defer rawResponse.Body.Close() if rawResponse.StatusCode != http.StatusOK { logger.Debug("non 200 response received from Circles attestation API") return nil } + body, err := io.ReadAll(rawResponse.Body) if err != nil { logger.Debug("unable to parse message body") @@ -60,7 +61,8 @@ func CheckAttestation(attestationURL string, logger log.Logger, irisLookupID str logger.Debug("unable to unmarshal response") return nil } - logger.Info(fmt.Sprintf("Attestation found for %s%s%s", attestationURL, "0x", irisLookupID)) + + logger.Info(fmt.Sprintf("Attestation found for %s%s", attestationURL, irisLookupID)) return &response } diff --git a/cmd/flags.go b/cmd/flags.go index b6c80b0..7cac21f 100644 --- a/cmd/flags.go +++ b/cmd/flags.go @@ -13,6 +13,7 @@ const ( flagJSON = "json" flagMetricsPort = "metrics-port" flagFlushInterval = "flush-interval" + flagFlushOnlyMode = "flush-only-mode" ) func addAppPersistantFlags(cmd *cobra.Command, a *AppState) *cobra.Command { @@ -21,6 +22,7 @@ func addAppPersistantFlags(cmd *cobra.Command, a *AppState) *cobra.Command { cmd.PersistentFlags().StringVar(&a.LogLevel, flagLogLevel, "info", "log level (debug, info, warn, error)") cmd.PersistentFlags().Int16P(flagMetricsPort, "p", 2112, "customize Prometheus metrics port") cmd.PersistentFlags().DurationP(flagFlushInterval, "i", 0, "how frequently should a flush routine be run") + cmd.PersistentFlags().BoolP(flagFlushOnlyMode, "f", false, "only run the background flush routine (acts as a redundant relayer)") return cmd } diff --git a/cmd/process.go b/cmd/process.go index 3f499f1..b9665d0 100644 --- a/cmd/process.go +++ b/cmd/process.go @@ -38,10 +38,29 @@ func Start(a *AppState) *cobra.Command { PersistentPreRun: func(cmd *cobra.Command, _ []string) { a.InitAppState() }, - Run: func(cmd *cobra.Command, args []string) { + RunE: func(cmd *cobra.Command, args []string) error { logger := a.Logger cfg := a.Config + flushInterval, err := cmd.Flags().GetDuration(flagFlushInterval) + if err != nil { + logger.Error("Invalid flush interval", "error", err) + } + + flushOnly, err := cmd.Flags().GetBool(flagFlushOnlyMode) + if err != nil { + return fmt.Errorf("invalid flush only flag error=%w", err) + } + + if flushInterval == 0 { + if flushOnly { + return fmt.Errorf("flush only mode requires a flush interval") + } else { + logger.Error("Flush interval not set. Use the --flush-interval flag to set a reoccurring flush") + } + } + + // start API on normal relayer only go startAPI(a) // messageState processing queue @@ -51,16 +70,7 @@ func Start(a *AppState) *cobra.Command { port, err := cmd.Flags().GetInt16(flagMetricsPort) if err != nil { - logger.Error("Invalid port", "error", err) - os.Exit(1) - } - - flushInterval, err := cmd.Flags().GetDuration(flagFlushInterval) - if err != nil { - logger.Error("Invalid flush interval", "error", err) - } - if flushInterval == 0 { - logger.Info("Flush interval not set. Use the --flush-interval flag to set a reoccurring flush") + return fmt.Errorf("invalid port error=%w", err) } metrics := relayer.InitPromMetrics(port) @@ -68,15 +78,13 @@ func Start(a *AppState) *cobra.Command { for name, cfg := range cfg.Chains { c, err := cfg.Chain(name) if err != nil { - logger.Error("Error creating chain", "err: ", err) - os.Exit(1) + return fmt.Errorf("error creating chain error=%w", err) } logger = logger.With("name", c.Name(), "domain", c.Domain()) if err := c.InitializeClients(cmd.Context(), logger); err != nil { - logger.Error("Error initializing client", "err", err) - os.Exit(1) + return fmt.Errorf("error initializing client error=%w", err) } go c.TrackLatestBlockHeight(cmd.Context(), logger, metrics) @@ -90,22 +98,20 @@ func Start(a *AppState) *cobra.Command { break } if i == maxRetries-1 { - logger.Error("Unable to get height") - os.Exit(1) + return fmt.Errorf("unable to get height") } } if err := c.InitializeBroadcaster(cmd.Context(), logger, sequenceMap); err != nil { - logger.Error("Error initializing broadcaster", "error", err) - os.Exit(1) + return fmt.Errorf("error initializing broadcaster error=%w", err) } - go c.StartListener(cmd.Context(), logger, processingQueue, flushInterval) + go c.StartListener(cmd.Context(), logger, processingQueue, flushOnly, flushInterval) + go c.WalletBalanceMetric(cmd.Context(), a.Logger, metrics) if _, ok := registeredDomains[c.Domain()]; ok { - logger.Error("Duplicate domain found", "domain", c.Domain(), "name:", c.Name()) - os.Exit(1) + return fmt.Errorf("duplicate domain found domain=%d name=%s", c.Domain(), c.Name()) } registeredDomains[c.Domain()] = c @@ -116,17 +122,19 @@ func Start(a *AppState) *cobra.Command { go StartProcessor(cmd.Context(), a, registeredDomains, processingQueue, sequenceMap, metrics) } - defer func() { - for _, c := range registeredDomains { - logger.Info(fmt.Sprintf("%s: latest-block: %d last-flushed-block: %d", c.Name(), c.LatestBlock(), c.LastFlushedBlock())) - err := c.CloseClients() - if err != nil { - logger.Error("Error closing clients", "error", err) - } + // wait for context to be done + <-cmd.Context().Done() + + // close clients & output latest block heights + for _, c := range registeredDomains { + logger.Info(fmt.Sprintf("%s: latest-block: %d last-flushed-block: %d", c.Name(), c.LatestBlock(), c.LastFlushedBlock())) + err := c.CloseClients() + if err != nil { + logger.Error("Error closing clients", "error", err) } - }() + } - <-cmd.Context().Done() + return nil }, } diff --git a/ethereum/listener.go b/ethereum/listener.go index 346f3a8..4183f0c 100644 --- a/ethereum/listener.go +++ b/ethereum/listener.go @@ -33,6 +33,7 @@ func (e *Ethereum) StartListener( ctx context.Context, logger log.Logger, processingQueue chan *types.TxState, + flushOnlyMode bool, flushInterval time.Duration, ) { logger = logger.With("chain", e.name, "chain_id", e.chainID, "domain", e.domain) @@ -55,43 +56,48 @@ func (e *Ethereum) StartListener( Ready: make(chan struct{}), } - // start main stream (does not account for lookback period or specific start block) - stream, sub, history := e.startMainStream(ctx, logger, messageSent, messageTransmitterAddress) + // FlushOnlyMode is used for the secondary, flush only relayer. When enabled, the main stream is not started. + if flushOnlyMode { + go e.flushMechanism(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, flushOnlyMode, flushInterval, sig) + } else { + // start main stream (does not account for lookback period or specific start block) + stream, sub, history := e.startMainStream(ctx, logger, messageSent, messageTransmitterAddress) - go e.consumeStream(ctx, logger, processingQueue, messageSent, messageTransmitterABI, stream, sig) - consumeHistory(logger, history, processingQueue, messageSent, messageTransmitterABI) + go e.consumeStream(ctx, logger, processingQueue, messageSent, messageTransmitterABI, stream, sig) + consumeHistory(logger, history, processingQueue, messageSent, messageTransmitterABI) - // get history from (start block - lookback) up until latest block - latestBlock := e.LatestBlock() - start := latestBlock - if e.startBlock != 0 { - start = e.startBlock - } - startLookback := start - e.lookbackPeriod - logger.Info(fmt.Sprintf("Getting history from %d: starting at: %d looking back %d blocks", startLookback, start, e.lookbackPeriod)) - e.getAndConsumeHistory(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, startLookback, latestBlock) + // get history from (start block - lookback) up until latest block + latestBlock := e.LatestBlock() + start := latestBlock + if e.startBlock != 0 { + start = e.startBlock + } + startLookback := start - e.lookbackPeriod - logger.Info("Finished getting history") + logger.Info(fmt.Sprintf("Getting history from %d: starting at: %d looking back %d blocks", startLookback, start, e.lookbackPeriod)) + e.getAndConsumeHistory(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, startLookback, latestBlock) + logger.Info("Finished getting history") - if flushInterval > 0 { - go e.flushMechanism(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, flushInterval, sig) - } + if flushInterval > 0 { + go e.flushMechanism(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, flushOnlyMode, flushInterval, sig) + } - // listen for errors in the main websocket stream - // if error occurs, trigger sig.Ready - // This will cancel `consumeStream` and `flushMechanism` routines - select { - case <-ctx.Done(): - return - case err := <-sub.Err(): - logger.Error("Websocket disconnected. Reconnecting...", "err", err) - close(sig.Ready) - - // restart - e.startBlock = e.lastFlushedBlock - time.Sleep(10 * time.Millisecond) - e.StartListener(ctx, logger, processingQueue, flushInterval) - return + // listen for errors in the main websocket stream + // if error occurs, trigger sig.Ready + // This will cancel `consumeStream` and `flushMechanism` routines + select { + case <-ctx.Done(): + return + case err := <-sub.Err(): + logger.Error("Websocket disconnected. Reconnecting...", "err", err) + close(sig.Ready) + + // restart + e.startBlock = e.lastFlushedBlock + time.Sleep(10 * time.Millisecond) + e.StartListener(ctx, logger, processingQueue, flushOnlyMode, flushInterval) + return + } } } @@ -278,11 +284,19 @@ func (e *Ethereum) flushMechanism( messageSent abi.Event, messageTransmitterAddress common.Address, messageTransmitterABI abi.ABI, + flushOnlyMode bool, flushInterval time.Duration, sig *errSignal, ) { logger.Info(fmt.Sprintf("Starting flush mechanism. Will flush every %v", flushInterval)) + // extraFlushBlocks is used to add an extra space between latest height and last flushed block + // this setting should only be used for the secondary, flush only relayer + extraFlushBlocks := uint64(0) + if flushOnlyMode { + extraFlushBlocks = 2 * e.lookbackPeriod + } + for { timer := time.NewTimer(flushInterval) select { @@ -291,7 +305,7 @@ func (e *Ethereum) flushMechanism( // initialize first lastFlushedBlock if not set if e.lastFlushedBlock == 0 { - e.lastFlushedBlock = latestBlock - 2*e.lookbackPeriod + e.lastFlushedBlock = latestBlock - (2*e.lookbackPeriod + extraFlushBlocks) if latestBlock < e.lookbackPeriod { e.lastFlushedBlock = 0 @@ -302,7 +316,7 @@ func (e *Ethereum) flushMechanism( startBlock := e.lastFlushedBlock // set finish block to be latestBlock - lookbackPeriod - finishBlock := latestBlock - e.lookbackPeriod + finishBlock := latestBlock - (e.lookbackPeriod + extraFlushBlocks) if startBlock >= finishBlock { logger.Debug("No new blocks to flush") diff --git a/ethereum/listener_test.go b/ethereum/listener_test.go index a6614e7..f261b24 100644 --- a/ethereum/listener_test.go +++ b/ethereum/listener_test.go @@ -28,7 +28,7 @@ func TestStartListener(t *testing.T) { processingQueue := make(chan *types.TxState, 10000) - go eth.StartListener(ctx, a.Logger, processingQueue, 0) + go eth.StartListener(ctx, a.Logger, processingQueue, false, 0) time.Sleep(5 * time.Second) diff --git a/integration/eth_burn_to_noble_mint_test.go b/integration/eth_burn_to_noble_mint_test.go index 7157421..ba565a0 100644 --- a/integration/eth_burn_to_noble_mint_test.go +++ b/integration/eth_burn_to_noble_mint_test.go @@ -73,7 +73,7 @@ func TestEthBurnToNobleMint(t *testing.T) { processingQueue := make(chan *types.TxState, 10) - go ethChain.StartListener(ctx, a.Logger, processingQueue, 0) + go ethChain.StartListener(ctx, a.Logger, processingQueue, false, 0) go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap, nil) _, _, generatedWallet := testdata.KeyTestPubAddr() diff --git a/integration/noble_burn_to_eth_mint_test.go b/integration/noble_burn_to_eth_mint_test.go index 85bed0f..239e0fb 100644 --- a/integration/noble_burn_to_eth_mint_test.go +++ b/integration/noble_burn_to_eth_mint_test.go @@ -79,7 +79,7 @@ func TestNobleBurnToEthMint(t *testing.T) { processingQueue := make(chan *types.TxState, 10) - go nobleChain.StartListener(ctx, a.Logger, processingQueue, 0) + go nobleChain.StartListener(ctx, a.Logger, processingQueue, false, 0) go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap, nil) ethDestinationAddress, _, err := generateEthWallet() diff --git a/noble/listener.go b/noble/listener.go index 7c4830c..e14fef1 100644 --- a/noble/listener.go +++ b/noble/listener.go @@ -17,6 +17,7 @@ func (n *Noble) StartListener( ctx context.Context, logger log.Logger, processingQueue chan *types.TxState, + flushOnlyMode bool, flushInterval_ time.Duration, ) { logger = logger.With("chain", n.Name(), "chain_id", n.chainID, "domain", n.Domain()) @@ -48,40 +49,42 @@ func (n *Noble) StartListener( } blockQueue := make(chan uint64, n.blockQueueChannelSize) - // history - currentBlock -= lookback - for currentBlock <= chainTip { - blockQueue <- currentBlock - currentBlock++ - } + if !flushOnlyMode { + // history + currentBlock -= lookback + for currentBlock <= chainTip { + blockQueue <- currentBlock + currentBlock++ + } - // listen for new blocks - go func() { - // inner function to queue blocks - queueBlocks := func() { - chainTip = n.LatestBlock() - if chainTip >= currentBlock { - for i := currentBlock; i <= chainTip; i++ { - blockQueue <- i + // listen for new blocks + go func() { + // inner function to queue blocks + queueBlocks := func() { + chainTip = n.LatestBlock() + if chainTip >= currentBlock { + for i := currentBlock; i <= chainTip; i++ { + blockQueue <- i + } + currentBlock = chainTip + 1 } - currentBlock = chainTip + 1 } - } - // initial queue - queueBlocks() - - for { - timer := time.NewTimer(6 * time.Second) - select { - case <-timer.C: - queueBlocks() - case <-ctx.Done(): - timer.Stop() - return + // initial queue + queueBlocks() + + for { + timer := time.NewTimer(6 * time.Second) + select { + case <-timer.C: + queueBlocks() + case <-ctx.Done(): + timer.Stop() + return + } } - } - }() + }() + } // constantly query for blocks for i := 0; i < int(n.workers); i++ { @@ -115,7 +118,7 @@ func (n *Noble) StartListener( } if flushInterval > 0 { - go n.flushMechanism(ctx, logger, blockQueue) + go n.flushMechanism(ctx, logger, blockQueue, flushOnlyMode) } <-ctx.Done() @@ -135,8 +138,16 @@ func (n *Noble) flushMechanism( ctx context.Context, logger log.Logger, blockQueue chan uint64, + flushOnlyMode bool, ) { - logger.Debug(fmt.Sprintf("Flush mechanism started. Will flush every %v", flushInterval)) + logger.Info(fmt.Sprintf("Starting flush mechanism. Will flush every %v", flushInterval)) + + // extraFlushBlocks is used to add an extra space between latest height and last flushed block + // this setting should only be used for the secondary, flush only relayer + extraFlushBlocks := uint64(0) + if flushOnlyMode { + extraFlushBlocks = 2 * n.lookbackPeriod + } for { timer := time.NewTimer(flushInterval) @@ -157,7 +168,7 @@ func (n *Noble) flushMechanism( // initialize first lastFlushedBlock if not set if n.lastFlushedBlock == 0 { - n.lastFlushedBlock = latestBlock - (2 * n.lookbackPeriod) + n.lastFlushedBlock = latestBlock - (2*n.lookbackPeriod + extraFlushBlocks) if latestBlock < n.lookbackPeriod { n.lastFlushedBlock = 0 @@ -168,7 +179,7 @@ func (n *Noble) flushMechanism( startBlock := n.lastFlushedBlock // set finish block to be latestBlock - lookbackPeriod - finishBlock := latestBlock - n.lookbackPeriod + finishBlock := latestBlock - (n.lookbackPeriod + extraFlushBlocks) if startBlock >= finishBlock { logger.Debug("No new blocks to flush") diff --git a/noble/listener_test.go b/noble/listener_test.go index 2d07ef0..45a8b8d 100644 --- a/noble/listener_test.go +++ b/noble/listener_test.go @@ -27,7 +27,7 @@ func TestStartListener(t *testing.T) { processingQueue := make(chan *types.TxState, 10000) - go n.StartListener(ctx, a.Logger, processingQueue, 0) + go n.StartListener(ctx, a.Logger, processingQueue, false, 0) time.Sleep(20 * time.Second) diff --git a/types/chain.go b/types/chain.go index d68c998..92d7b7f 100644 --- a/types/chain.go +++ b/types/chain.go @@ -53,6 +53,7 @@ type Chain interface { ctx context.Context, logger log.Logger, processingQueue chan *TxState, + flushOnlyMode bool, flushInterval time.Duration, )