From 9f4430fd07002b9b7fd0074af085e1b61f05999a Mon Sep 17 00:00:00 2001 From: Joel Smith Date: Mon, 6 May 2024 16:32:01 -0500 Subject: [PATCH 1/6] Add flush only mode --- cmd/flags.go | 2 + cmd/process.go | 32 ++++++--- ethereum/listener.go | 83 +++++++++++++--------- ethereum/listener_test.go | 2 +- integration/eth_burn_to_noble_mint_test.go | 2 +- integration/noble_burn_to_eth_mint_test.go | 2 +- noble/listener.go | 77 +++++++++++--------- noble/listener_test.go | 2 +- types/chain.go | 1 + 9 files changed, 123 insertions(+), 80 deletions(-) diff --git a/cmd/flags.go b/cmd/flags.go index b6c80b0..7cac21f 100644 --- a/cmd/flags.go +++ b/cmd/flags.go @@ -13,6 +13,7 @@ const ( flagJSON = "json" flagMetricsPort = "metrics-port" flagFlushInterval = "flush-interval" + flagFlushOnlyMode = "flush-only-mode" ) func addAppPersistantFlags(cmd *cobra.Command, a *AppState) *cobra.Command { @@ -21,6 +22,7 @@ func addAppPersistantFlags(cmd *cobra.Command, a *AppState) *cobra.Command { cmd.PersistentFlags().StringVar(&a.LogLevel, flagLogLevel, "info", "log level (debug, info, warn, error)") cmd.PersistentFlags().Int16P(flagMetricsPort, "p", 2112, "customize Prometheus metrics port") cmd.PersistentFlags().DurationP(flagFlushInterval, "i", 0, "how frequently should a flush routine be run") + cmd.PersistentFlags().BoolP(flagFlushOnlyMode, "f", false, "only run the background flush routine (acts as a redundant relayer)") return cmd } diff --git a/cmd/process.go b/cmd/process.go index 3f499f1..4bcc7c8 100644 --- a/cmd/process.go +++ b/cmd/process.go @@ -42,6 +42,27 @@ func Start(a *AppState) *cobra.Command { logger := a.Logger cfg := a.Config + flushInterval, err := cmd.Flags().GetDuration(flagFlushInterval) + if err != nil { + logger.Error("Invalid flush interval", "error", err) + } + + flushOnly, err := cmd.Flags().GetBool(flagFlushOnlyMode) + if err != nil { + logger.Error("Invalid flush only flag", "error", err) + os.Exit(1) + } + + if flushInterval == 0 { + if flushOnly { + logger.Error("Flush only mode requires a flush interval") + os.Exit(1) + } else { + logger.Info("Flush interval not set. Use the --flush-interval flag to set a reoccurring flush") + } + } + + // start API on normal relayer only go startAPI(a) // messageState processing queue @@ -55,14 +76,6 @@ func Start(a *AppState) *cobra.Command { os.Exit(1) } - flushInterval, err := cmd.Flags().GetDuration(flagFlushInterval) - if err != nil { - logger.Error("Invalid flush interval", "error", err) - } - if flushInterval == 0 { - logger.Info("Flush interval not set. Use the --flush-interval flag to set a reoccurring flush") - } - metrics := relayer.InitPromMetrics(port) for name, cfg := range cfg.Chains { @@ -100,7 +113,8 @@ func Start(a *AppState) *cobra.Command { os.Exit(1) } - go c.StartListener(cmd.Context(), logger, processingQueue, flushInterval) + go c.StartListener(cmd.Context(), logger, processingQueue, flushOnly, flushInterval) + go c.WalletBalanceMetric(cmd.Context(), a.Logger, metrics) if _, ok := registeredDomains[c.Domain()]; ok { diff --git a/ethereum/listener.go b/ethereum/listener.go index 346f3a8..4daf974 100644 --- a/ethereum/listener.go +++ b/ethereum/listener.go @@ -33,6 +33,7 @@ func (e *Ethereum) StartListener( ctx context.Context, logger log.Logger, processingQueue chan *types.TxState, + flushOnlyMode bool, flushInterval time.Duration, ) { logger = logger.With("chain", e.name, "chain_id", e.chainID, "domain", e.domain) @@ -55,43 +56,49 @@ func (e *Ethereum) StartListener( Ready: make(chan struct{}), } - // start main stream (does not account for lookback period or specific start block) - stream, sub, history := e.startMainStream(ctx, logger, messageSent, messageTransmitterAddress) + // FlushOnlyMode should only run the flush mechanism, otherwise start the main listener, + // otherwise consume history and incoming msgs + if flushOnlyMode { + go e.flushMechanism(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, flushOnlyMode, flushInterval, sig) + } else { + // start main stream (does not account for lookback period or specific start block) + stream, sub, history := e.startMainStream(ctx, logger, messageSent, messageTransmitterAddress) - go e.consumeStream(ctx, logger, processingQueue, messageSent, messageTransmitterABI, stream, sig) - consumeHistory(logger, history, processingQueue, messageSent, messageTransmitterABI) + go e.consumeStream(ctx, logger, processingQueue, messageSent, messageTransmitterABI, stream, sig) + consumeHistory(logger, history, processingQueue, messageSent, messageTransmitterABI) - // get history from (start block - lookback) up until latest block - latestBlock := e.LatestBlock() - start := latestBlock - if e.startBlock != 0 { - start = e.startBlock - } - startLookback := start - e.lookbackPeriod - logger.Info(fmt.Sprintf("Getting history from %d: starting at: %d looking back %d blocks", startLookback, start, e.lookbackPeriod)) - e.getAndConsumeHistory(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, startLookback, latestBlock) + // get history from (start block - lookback) up until latest block + latestBlock := e.LatestBlock() + start := latestBlock + if e.startBlock != 0 { + start = e.startBlock + } + startLookback := start - e.lookbackPeriod - logger.Info("Finished getting history") + logger.Info(fmt.Sprintf("Getting history from %d: starting at: %d looking back %d blocks", startLookback, start, e.lookbackPeriod)) + e.getAndConsumeHistory(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, startLookback, latestBlock) + logger.Info("Finished getting history") - if flushInterval > 0 { - go e.flushMechanism(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, flushInterval, sig) - } + if flushInterval > 0 { + go e.flushMechanism(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, flushOnlyMode, flushInterval, sig) + } - // listen for errors in the main websocket stream - // if error occurs, trigger sig.Ready - // This will cancel `consumeStream` and `flushMechanism` routines - select { - case <-ctx.Done(): - return - case err := <-sub.Err(): - logger.Error("Websocket disconnected. Reconnecting...", "err", err) - close(sig.Ready) - - // restart - e.startBlock = e.lastFlushedBlock - time.Sleep(10 * time.Millisecond) - e.StartListener(ctx, logger, processingQueue, flushInterval) - return + // listen for errors in the main websocket stream + // if error occurs, trigger sig.Ready + // This will cancel `consumeStream` and `flushMechanism` routines + select { + case <-ctx.Done(): + return + case err := <-sub.Err(): + logger.Error("Websocket disconnected. Reconnecting...", "err", err) + close(sig.Ready) + + // restart + e.startBlock = e.lastFlushedBlock + time.Sleep(10 * time.Millisecond) + e.StartListener(ctx, logger, processingQueue, flushOnlyMode, flushInterval) + return + } } } @@ -278,11 +285,19 @@ func (e *Ethereum) flushMechanism( messageSent abi.Event, messageTransmitterAddress common.Address, messageTransmitterABI abi.ABI, + flushOnlyMode bool, flushInterval time.Duration, sig *errSignal, ) { logger.Info(fmt.Sprintf("Starting flush mechanism. Will flush every %v", flushInterval)) + // extraFlushBlocks is used to add an extra space between latest height and last flushed block + // this setting should only be used for the secondary, flush only relayer + extraFlushBlocks := uint64(0) + if flushOnlyMode { + extraFlushBlocks = 2 * e.lookbackPeriod + } + for { timer := time.NewTimer(flushInterval) select { @@ -291,7 +306,7 @@ func (e *Ethereum) flushMechanism( // initialize first lastFlushedBlock if not set if e.lastFlushedBlock == 0 { - e.lastFlushedBlock = latestBlock - 2*e.lookbackPeriod + e.lastFlushedBlock = latestBlock - (2*e.lookbackPeriod + extraFlushBlocks) if latestBlock < e.lookbackPeriod { e.lastFlushedBlock = 0 @@ -302,7 +317,7 @@ func (e *Ethereum) flushMechanism( startBlock := e.lastFlushedBlock // set finish block to be latestBlock - lookbackPeriod - finishBlock := latestBlock - e.lookbackPeriod + finishBlock := latestBlock - (e.lookbackPeriod + extraFlushBlocks) if startBlock >= finishBlock { logger.Debug("No new blocks to flush") diff --git a/ethereum/listener_test.go b/ethereum/listener_test.go index a6614e7..f261b24 100644 --- a/ethereum/listener_test.go +++ b/ethereum/listener_test.go @@ -28,7 +28,7 @@ func TestStartListener(t *testing.T) { processingQueue := make(chan *types.TxState, 10000) - go eth.StartListener(ctx, a.Logger, processingQueue, 0) + go eth.StartListener(ctx, a.Logger, processingQueue, false, 0) time.Sleep(5 * time.Second) diff --git a/integration/eth_burn_to_noble_mint_test.go b/integration/eth_burn_to_noble_mint_test.go index 7157421..ba565a0 100644 --- a/integration/eth_burn_to_noble_mint_test.go +++ b/integration/eth_burn_to_noble_mint_test.go @@ -73,7 +73,7 @@ func TestEthBurnToNobleMint(t *testing.T) { processingQueue := make(chan *types.TxState, 10) - go ethChain.StartListener(ctx, a.Logger, processingQueue, 0) + go ethChain.StartListener(ctx, a.Logger, processingQueue, false, 0) go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap, nil) _, _, generatedWallet := testdata.KeyTestPubAddr() diff --git a/integration/noble_burn_to_eth_mint_test.go b/integration/noble_burn_to_eth_mint_test.go index 85bed0f..239e0fb 100644 --- a/integration/noble_burn_to_eth_mint_test.go +++ b/integration/noble_burn_to_eth_mint_test.go @@ -79,7 +79,7 @@ func TestNobleBurnToEthMint(t *testing.T) { processingQueue := make(chan *types.TxState, 10) - go nobleChain.StartListener(ctx, a.Logger, processingQueue, 0) + go nobleChain.StartListener(ctx, a.Logger, processingQueue, false, 0) go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap, nil) ethDestinationAddress, _, err := generateEthWallet() diff --git a/noble/listener.go b/noble/listener.go index 7c4830c..e14fef1 100644 --- a/noble/listener.go +++ b/noble/listener.go @@ -17,6 +17,7 @@ func (n *Noble) StartListener( ctx context.Context, logger log.Logger, processingQueue chan *types.TxState, + flushOnlyMode bool, flushInterval_ time.Duration, ) { logger = logger.With("chain", n.Name(), "chain_id", n.chainID, "domain", n.Domain()) @@ -48,40 +49,42 @@ func (n *Noble) StartListener( } blockQueue := make(chan uint64, n.blockQueueChannelSize) - // history - currentBlock -= lookback - for currentBlock <= chainTip { - blockQueue <- currentBlock - currentBlock++ - } + if !flushOnlyMode { + // history + currentBlock -= lookback + for currentBlock <= chainTip { + blockQueue <- currentBlock + currentBlock++ + } - // listen for new blocks - go func() { - // inner function to queue blocks - queueBlocks := func() { - chainTip = n.LatestBlock() - if chainTip >= currentBlock { - for i := currentBlock; i <= chainTip; i++ { - blockQueue <- i + // listen for new blocks + go func() { + // inner function to queue blocks + queueBlocks := func() { + chainTip = n.LatestBlock() + if chainTip >= currentBlock { + for i := currentBlock; i <= chainTip; i++ { + blockQueue <- i + } + currentBlock = chainTip + 1 } - currentBlock = chainTip + 1 } - } - // initial queue - queueBlocks() - - for { - timer := time.NewTimer(6 * time.Second) - select { - case <-timer.C: - queueBlocks() - case <-ctx.Done(): - timer.Stop() - return + // initial queue + queueBlocks() + + for { + timer := time.NewTimer(6 * time.Second) + select { + case <-timer.C: + queueBlocks() + case <-ctx.Done(): + timer.Stop() + return + } } - } - }() + }() + } // constantly query for blocks for i := 0; i < int(n.workers); i++ { @@ -115,7 +118,7 @@ func (n *Noble) StartListener( } if flushInterval > 0 { - go n.flushMechanism(ctx, logger, blockQueue) + go n.flushMechanism(ctx, logger, blockQueue, flushOnlyMode) } <-ctx.Done() @@ -135,8 +138,16 @@ func (n *Noble) flushMechanism( ctx context.Context, logger log.Logger, blockQueue chan uint64, + flushOnlyMode bool, ) { - logger.Debug(fmt.Sprintf("Flush mechanism started. Will flush every %v", flushInterval)) + logger.Info(fmt.Sprintf("Starting flush mechanism. Will flush every %v", flushInterval)) + + // extraFlushBlocks is used to add an extra space between latest height and last flushed block + // this setting should only be used for the secondary, flush only relayer + extraFlushBlocks := uint64(0) + if flushOnlyMode { + extraFlushBlocks = 2 * n.lookbackPeriod + } for { timer := time.NewTimer(flushInterval) @@ -157,7 +168,7 @@ func (n *Noble) flushMechanism( // initialize first lastFlushedBlock if not set if n.lastFlushedBlock == 0 { - n.lastFlushedBlock = latestBlock - (2 * n.lookbackPeriod) + n.lastFlushedBlock = latestBlock - (2*n.lookbackPeriod + extraFlushBlocks) if latestBlock < n.lookbackPeriod { n.lastFlushedBlock = 0 @@ -168,7 +179,7 @@ func (n *Noble) flushMechanism( startBlock := n.lastFlushedBlock // set finish block to be latestBlock - lookbackPeriod - finishBlock := latestBlock - n.lookbackPeriod + finishBlock := latestBlock - (n.lookbackPeriod + extraFlushBlocks) if startBlock >= finishBlock { logger.Debug("No new blocks to flush") diff --git a/noble/listener_test.go b/noble/listener_test.go index 2d07ef0..45a8b8d 100644 --- a/noble/listener_test.go +++ b/noble/listener_test.go @@ -27,7 +27,7 @@ func TestStartListener(t *testing.T) { processingQueue := make(chan *types.TxState, 10000) - go n.StartListener(ctx, a.Logger, processingQueue, 0) + go n.StartListener(ctx, a.Logger, processingQueue, false, 0) time.Sleep(20 * time.Second) diff --git a/types/chain.go b/types/chain.go index d68c998..92d7b7f 100644 --- a/types/chain.go +++ b/types/chain.go @@ -53,6 +53,7 @@ type Chain interface { ctx context.Context, logger log.Logger, processingQueue chan *TxState, + flushOnlyMode bool, flushInterval time.Duration, ) From cb30e8221e890d093a9b18a141ab842a19a551d8 Mon Sep 17 00:00:00 2001 From: Joel Smith Date: Mon, 6 May 2024 16:32:08 -0500 Subject: [PATCH 2/6] Update README --- README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.md b/README.md index 94bae60..d4a1f7d 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,14 @@ Consider a 30 minute flush interval (1800 seconds) - Polygon: 2 second blocks = (1800 / 2) = `900 blocks` - Arbitrum: 0.26 second blocks = (1800 / 0.26) = `~6950 blocks` +### Flush Only Mode + +This relayer also supports a `--flush-only-mode`. This mode will only flush the chain and not actively listen for new events as they occur. This is useful for running a secondary relayer which "lags" behind the primary relayer. It is only responsible for retrying failed transactions. + +When the relayer is in flush only mode, the flush mechanism will start at `latest height - (4 * lookback period)` and finish at `latest height - (3 * lookback period)`. For all subsequent flushes, the relayer will start at the last flushed block and finish at `latest height - (3 * lookback period)`. Please see the notes above for configuring the flush interval and lookback period. + +> Note: It is highly recommended to use the same configuration for both the primary and secondary relayer. This ensures that there is zero overlap between the relayers. + ### Prometheus Metrics By default, metrics are exported at on port :2112/metrics (`http://localhost:2112/metrics`). You can customize the port using the `--metrics-port` flag. From 0bd414deeb5c20f607d6f125c8ff67cc4e2cf742 Mon Sep 17 00:00:00 2001 From: Joel Smith Date: Mon, 6 May 2024 16:32:24 -0500 Subject: [PATCH 3/6] Fix URL Parsing & Redundant Timeout --- circle/attestation.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/circle/attestation.go b/circle/attestation.go index 4776798..b866076 100644 --- a/circle/attestation.go +++ b/circle/attestation.go @@ -15,13 +15,6 @@ import ( // CheckAttestation checks the iris api for attestation status and returns true if attestation is complete func CheckAttestation(attestationURL string, logger log.Logger, irisLookupID string, txHash string, sourceDomain, destDomain types.Domain) *types.AttestationResponse { - logger.Debug(fmt.Sprintf("Checking attestation for %s%s%s for source tx %s from %d to %d", attestationURL, "0x", irisLookupID, txHash, sourceDomain, destDomain)) - - client := http.Client{Timeout: 2 * time.Second} - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - // append ending / if not present if attestationURL[len(attestationURL)-1:] != "/" { attestationURL += "/" @@ -32,22 +25,30 @@ func CheckAttestation(attestationURL string, logger log.Logger, irisLookupID str irisLookupID = "0x" + irisLookupID } + logger.Debug(fmt.Sprintf("Checking attestation for %s%s for source tx %s from %d to %d", attestationURL, irisLookupID, txHash, sourceDomain, destDomain)) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, attestationURL+irisLookupID, nil) if err != nil { logger.Debug("error creating request: " + err.Error()) return nil } + client := http.Client{} rawResponse, err := client.Do(req) if err != nil { logger.Debug("error during request: " + err.Error()) return nil } + defer rawResponse.Body.Close() if rawResponse.StatusCode != http.StatusOK { logger.Debug("non 200 response received from Circles attestation API") return nil } + body, err := io.ReadAll(rawResponse.Body) if err != nil { logger.Debug("unable to parse message body") @@ -60,7 +61,8 @@ func CheckAttestation(attestationURL string, logger log.Logger, irisLookupID str logger.Debug("unable to unmarshal response") return nil } - logger.Info(fmt.Sprintf("Attestation found for %s%s%s", attestationURL, "0x", irisLookupID)) + + logger.Info(fmt.Sprintf("Attestation found for %s%s", attestationURL, irisLookupID)) return &response } From b0586fc63187ee48d11cc480e6bd36e51b75089d Mon Sep 17 00:00:00 2001 From: Joel Smith Date: Mon, 20 May 2024 15:39:53 -0500 Subject: [PATCH 4/6] Fix confusing comment --- ethereum/listener.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ethereum/listener.go b/ethereum/listener.go index 4daf974..4183f0c 100644 --- a/ethereum/listener.go +++ b/ethereum/listener.go @@ -56,8 +56,7 @@ func (e *Ethereum) StartListener( Ready: make(chan struct{}), } - // FlushOnlyMode should only run the flush mechanism, otherwise start the main listener, - // otherwise consume history and incoming msgs + // FlushOnlyMode is used for the secondary, flush only relayer. When enabled, the main stream is not started. if flushOnlyMode { go e.flushMechanism(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, flushOnlyMode, flushInterval, sig) } else { From 867507ac0a0b50790d4b3aa67d74134a27a7d101 Mon Sep 17 00:00:00 2001 From: Joel Smith Date: Mon, 20 May 2024 17:14:24 -0500 Subject: [PATCH 5/6] Update Run to RunE --- cmd/process.go | 48 +++++++++++++++++++++--------------------------- 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/cmd/process.go b/cmd/process.go index 4bcc7c8..43cf14c 100644 --- a/cmd/process.go +++ b/cmd/process.go @@ -38,7 +38,7 @@ func Start(a *AppState) *cobra.Command { PersistentPreRun: func(cmd *cobra.Command, _ []string) { a.InitAppState() }, - Run: func(cmd *cobra.Command, args []string) { + RunE: func(cmd *cobra.Command, args []string) error { logger := a.Logger cfg := a.Config @@ -49,16 +49,14 @@ func Start(a *AppState) *cobra.Command { flushOnly, err := cmd.Flags().GetBool(flagFlushOnlyMode) if err != nil { - logger.Error("Invalid flush only flag", "error", err) - os.Exit(1) + return fmt.Errorf("invalid flush only flag error=%e", err) } if flushInterval == 0 { if flushOnly { - logger.Error("Flush only mode requires a flush interval") - os.Exit(1) + return fmt.Errorf("flush only mode requires a flush interval") } else { - logger.Info("Flush interval not set. Use the --flush-interval flag to set a reoccurring flush") + logger.Error("Flush interval not set. Use the --flush-interval flag to set a reoccurring flush") } } @@ -72,8 +70,7 @@ func Start(a *AppState) *cobra.Command { port, err := cmd.Flags().GetInt16(flagMetricsPort) if err != nil { - logger.Error("Invalid port", "error", err) - os.Exit(1) + return fmt.Errorf("invalid port error=%e", err) } metrics := relayer.InitPromMetrics(port) @@ -81,15 +78,13 @@ func Start(a *AppState) *cobra.Command { for name, cfg := range cfg.Chains { c, err := cfg.Chain(name) if err != nil { - logger.Error("Error creating chain", "err: ", err) - os.Exit(1) + return fmt.Errorf("error creating chain error=%e", err) } logger = logger.With("name", c.Name(), "domain", c.Domain()) if err := c.InitializeClients(cmd.Context(), logger); err != nil { - logger.Error("Error initializing client", "err", err) - os.Exit(1) + return fmt.Errorf("error initializing client error=%e", err) } go c.TrackLatestBlockHeight(cmd.Context(), logger, metrics) @@ -103,14 +98,12 @@ func Start(a *AppState) *cobra.Command { break } if i == maxRetries-1 { - logger.Error("Unable to get height") - os.Exit(1) + return fmt.Errorf("unable to get height") } } if err := c.InitializeBroadcaster(cmd.Context(), logger, sequenceMap); err != nil { - logger.Error("Error initializing broadcaster", "error", err) - os.Exit(1) + return fmt.Errorf("error initializing broadcaster error=%e", err) } go c.StartListener(cmd.Context(), logger, processingQueue, flushOnly, flushInterval) @@ -118,8 +111,7 @@ func Start(a *AppState) *cobra.Command { go c.WalletBalanceMetric(cmd.Context(), a.Logger, metrics) if _, ok := registeredDomains[c.Domain()]; ok { - logger.Error("Duplicate domain found", "domain", c.Domain(), "name:", c.Name()) - os.Exit(1) + return fmt.Errorf("duplicate domain found domain=%d name=%s", c.Domain(), c.Name()) } registeredDomains[c.Domain()] = c @@ -130,17 +122,19 @@ func Start(a *AppState) *cobra.Command { go StartProcessor(cmd.Context(), a, registeredDomains, processingQueue, sequenceMap, metrics) } - defer func() { - for _, c := range registeredDomains { - logger.Info(fmt.Sprintf("%s: latest-block: %d last-flushed-block: %d", c.Name(), c.LatestBlock(), c.LastFlushedBlock())) - err := c.CloseClients() - if err != nil { - logger.Error("Error closing clients", "error", err) - } + // wait for context to be done + <-cmd.Context().Done() + + // close clients & output latest block heights + for _, c := range registeredDomains { + logger.Info(fmt.Sprintf("%s: latest-block: %d last-flushed-block: %d", c.Name(), c.LatestBlock(), c.LastFlushedBlock())) + err := c.CloseClients() + if err != nil { + logger.Error("Error closing clients", "error", err) } - }() + } - <-cmd.Context().Done() + return nil }, } From e249b1b41ea62be88ccf0d0011652a76b8c9b33b Mon Sep 17 00:00:00 2001 From: Joel Smith Date: Tue, 21 May 2024 10:01:11 -0500 Subject: [PATCH 6/6] Fix error wrapping --- cmd/process.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/process.go b/cmd/process.go index 43cf14c..b9665d0 100644 --- a/cmd/process.go +++ b/cmd/process.go @@ -49,7 +49,7 @@ func Start(a *AppState) *cobra.Command { flushOnly, err := cmd.Flags().GetBool(flagFlushOnlyMode) if err != nil { - return fmt.Errorf("invalid flush only flag error=%e", err) + return fmt.Errorf("invalid flush only flag error=%w", err) } if flushInterval == 0 { @@ -70,7 +70,7 @@ func Start(a *AppState) *cobra.Command { port, err := cmd.Flags().GetInt16(flagMetricsPort) if err != nil { - return fmt.Errorf("invalid port error=%e", err) + return fmt.Errorf("invalid port error=%w", err) } metrics := relayer.InitPromMetrics(port) @@ -78,13 +78,13 @@ func Start(a *AppState) *cobra.Command { for name, cfg := range cfg.Chains { c, err := cfg.Chain(name) if err != nil { - return fmt.Errorf("error creating chain error=%e", err) + return fmt.Errorf("error creating chain error=%w", err) } logger = logger.With("name", c.Name(), "domain", c.Domain()) if err := c.InitializeClients(cmd.Context(), logger); err != nil { - return fmt.Errorf("error initializing client error=%e", err) + return fmt.Errorf("error initializing client error=%w", err) } go c.TrackLatestBlockHeight(cmd.Context(), logger, metrics) @@ -103,7 +103,7 @@ func Start(a *AppState) *cobra.Command { } if err := c.InitializeBroadcaster(cmd.Context(), logger, sequenceMap); err != nil { - return fmt.Errorf("error initializing broadcaster error=%e", err) + return fmt.Errorf("error initializing broadcaster error=%w", err) } go c.StartListener(cmd.Context(), logger, processingQueue, flushOnly, flushInterval)