Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: flush only mode #91

Merged
merged 6 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ Consider a 30 minute flush interval (1800 seconds)
- Polygon: 2 second blocks = (1800 / 2) = `900 blocks`
- Arbitrum: 0.26 second blocks = (1800 / 0.26) = `~6950 blocks`

### Flush Only Mode

This relayer also supports a `--flush-only-mode`. This mode will only flush the chain and not actively listen for new events as they occur. This is useful for running a secondary relayer which "lags" behind the primary relayer. It is only responsible for retrying failed transactions.

joelsmith-2019 marked this conversation as resolved.
Show resolved Hide resolved
When the relayer is in flush only mode, the flush mechanism will start at `latest height - (4 * lookback period)` and finish at `latest height - (3 * lookback period)`. For all subsequent flushes, the relayer will start at the last flushed block and finish at `latest height - (3 * lookback period)`. Please see the notes above for configuring the flush interval and lookback period.

> Note: It is highly recommended to use the same configuration for both the primary and secondary relayer. This ensures that there is zero overlap between the relayers.

### Prometheus Metrics

By default, metrics are exported at on port :2112/metrics (`http://localhost:2112/metrics`). You can customize the port using the `--metrics-port` flag.
Expand Down
18 changes: 10 additions & 8 deletions circle/attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@ import (

// CheckAttestation checks the iris api for attestation status and returns true if attestation is complete
func CheckAttestation(attestationURL string, logger log.Logger, irisLookupID string, txHash string, sourceDomain, destDomain types.Domain) *types.AttestationResponse {
logger.Debug(fmt.Sprintf("Checking attestation for %s%s%s for source tx %s from %d to %d", attestationURL, "0x", irisLookupID, txHash, sourceDomain, destDomain))

client := http.Client{Timeout: 2 * time.Second}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// append ending / if not present
if attestationURL[len(attestationURL)-1:] != "/" {
attestationURL += "/"
Expand All @@ -32,22 +25,30 @@ func CheckAttestation(attestationURL string, logger log.Logger, irisLookupID str
irisLookupID = "0x" + irisLookupID
}

logger.Debug(fmt.Sprintf("Checking attestation for %s%s for source tx %s from %d to %d", attestationURL, irisLookupID, txHash, sourceDomain, destDomain))

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, attestationURL+irisLookupID, nil)
if err != nil {
logger.Debug("error creating request: " + err.Error())
return nil
}

client := http.Client{}
jtieri marked this conversation as resolved.
Show resolved Hide resolved
rawResponse, err := client.Do(req)
if err != nil {
logger.Debug("error during request: " + err.Error())
return nil
}

defer rawResponse.Body.Close()
if rawResponse.StatusCode != http.StatusOK {
logger.Debug("non 200 response received from Circles attestation API")
return nil
}

body, err := io.ReadAll(rawResponse.Body)
if err != nil {
logger.Debug("unable to parse message body")
Expand All @@ -60,7 +61,8 @@ func CheckAttestation(attestationURL string, logger log.Logger, irisLookupID str
logger.Debug("unable to unmarshal response")
return nil
}
logger.Info(fmt.Sprintf("Attestation found for %s%s%s", attestationURL, "0x", irisLookupID))

logger.Info(fmt.Sprintf("Attestation found for %s%s", attestationURL, irisLookupID))

return &response
}
2 changes: 2 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
flagJSON = "json"
flagMetricsPort = "metrics-port"
flagFlushInterval = "flush-interval"
flagFlushOnlyMode = "flush-only-mode"
)

func addAppPersistantFlags(cmd *cobra.Command, a *AppState) *cobra.Command {
Expand All @@ -21,6 +22,7 @@ func addAppPersistantFlags(cmd *cobra.Command, a *AppState) *cobra.Command {
cmd.PersistentFlags().StringVar(&a.LogLevel, flagLogLevel, "info", "log level (debug, info, warn, error)")
cmd.PersistentFlags().Int16P(flagMetricsPort, "p", 2112, "customize Prometheus metrics port")
cmd.PersistentFlags().DurationP(flagFlushInterval, "i", 0, "how frequently should a flush routine be run")
cmd.PersistentFlags().BoolP(flagFlushOnlyMode, "f", false, "only run the background flush routine (acts as a redundant relayer)")
return cmd
}

Expand Down
70 changes: 39 additions & 31 deletions cmd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,29 @@ func Start(a *AppState) *cobra.Command {
PersistentPreRun: func(cmd *cobra.Command, _ []string) {
a.InitAppState()
},
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
logger := a.Logger
cfg := a.Config

flushInterval, err := cmd.Flags().GetDuration(flagFlushInterval)
if err != nil {
logger.Error("Invalid flush interval", "error", err)
jtieri marked this conversation as resolved.
Show resolved Hide resolved
}

flushOnly, err := cmd.Flags().GetBool(flagFlushOnlyMode)
if err != nil {
return fmt.Errorf("invalid flush only flag error=%w", err)
}

if flushInterval == 0 {
if flushOnly {
return fmt.Errorf("flush only mode requires a flush interval")
} else {
logger.Error("Flush interval not set. Use the --flush-interval flag to set a reoccurring flush")
}
}

// start API on normal relayer only
go startAPI(a)

// messageState processing queue
Expand All @@ -51,32 +70,21 @@ func Start(a *AppState) *cobra.Command {

port, err := cmd.Flags().GetInt16(flagMetricsPort)
if err != nil {
logger.Error("Invalid port", "error", err)
os.Exit(1)
}

flushInterval, err := cmd.Flags().GetDuration(flagFlushInterval)
if err != nil {
logger.Error("Invalid flush interval", "error", err)
}
if flushInterval == 0 {
logger.Info("Flush interval not set. Use the --flush-interval flag to set a reoccurring flush")
return fmt.Errorf("invalid port error=%w", err)
}

metrics := relayer.InitPromMetrics(port)

for name, cfg := range cfg.Chains {
c, err := cfg.Chain(name)
if err != nil {
logger.Error("Error creating chain", "err: ", err)
os.Exit(1)
return fmt.Errorf("error creating chain error=%w", err)
}

logger = logger.With("name", c.Name(), "domain", c.Domain())

if err := c.InitializeClients(cmd.Context(), logger); err != nil {
logger.Error("Error initializing client", "err", err)
os.Exit(1)
return fmt.Errorf("error initializing client error=%w", err)
}

go c.TrackLatestBlockHeight(cmd.Context(), logger, metrics)
Expand All @@ -90,22 +98,20 @@ func Start(a *AppState) *cobra.Command {
break
}
if i == maxRetries-1 {
logger.Error("Unable to get height")
os.Exit(1)
return fmt.Errorf("unable to get height")
}
}

if err := c.InitializeBroadcaster(cmd.Context(), logger, sequenceMap); err != nil {
logger.Error("Error initializing broadcaster", "error", err)
os.Exit(1)
return fmt.Errorf("error initializing broadcaster error=%w", err)
}

go c.StartListener(cmd.Context(), logger, processingQueue, flushInterval)
go c.StartListener(cmd.Context(), logger, processingQueue, flushOnly, flushInterval)

go c.WalletBalanceMetric(cmd.Context(), a.Logger, metrics)

if _, ok := registeredDomains[c.Domain()]; ok {
logger.Error("Duplicate domain found", "domain", c.Domain(), "name:", c.Name())
os.Exit(1)
return fmt.Errorf("duplicate domain found domain=%d name=%s", c.Domain(), c.Name())
}

registeredDomains[c.Domain()] = c
Expand All @@ -116,17 +122,19 @@ func Start(a *AppState) *cobra.Command {
go StartProcessor(cmd.Context(), a, registeredDomains, processingQueue, sequenceMap, metrics)
}

defer func() {
for _, c := range registeredDomains {
logger.Info(fmt.Sprintf("%s: latest-block: %d last-flushed-block: %d", c.Name(), c.LatestBlock(), c.LastFlushedBlock()))
err := c.CloseClients()
if err != nil {
logger.Error("Error closing clients", "error", err)
}
// wait for context to be done
<-cmd.Context().Done()

// close clients & output latest block heights
for _, c := range registeredDomains {
logger.Info(fmt.Sprintf("%s: latest-block: %d last-flushed-block: %d", c.Name(), c.LatestBlock(), c.LastFlushedBlock()))
err := c.CloseClients()
if err != nil {
logger.Error("Error closing clients", "error", err)
}
}()
}

<-cmd.Context().Done()
return nil
},
}

Expand Down
82 changes: 48 additions & 34 deletions ethereum/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func (e *Ethereum) StartListener(
ctx context.Context,
logger log.Logger,
processingQueue chan *types.TxState,
flushOnlyMode bool,
flushInterval time.Duration,
) {
logger = logger.With("chain", e.name, "chain_id", e.chainID, "domain", e.domain)
Expand All @@ -55,43 +56,48 @@ func (e *Ethereum) StartListener(
Ready: make(chan struct{}),
}

// start main stream (does not account for lookback period or specific start block)
stream, sub, history := e.startMainStream(ctx, logger, messageSent, messageTransmitterAddress)
// FlushOnlyMode is used for the secondary, flush only relayer. When enabled, the main stream is not started.
if flushOnlyMode {
go e.flushMechanism(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, flushOnlyMode, flushInterval, sig)
} else {
// start main stream (does not account for lookback period or specific start block)
stream, sub, history := e.startMainStream(ctx, logger, messageSent, messageTransmitterAddress)

go e.consumeStream(ctx, logger, processingQueue, messageSent, messageTransmitterABI, stream, sig)
consumeHistory(logger, history, processingQueue, messageSent, messageTransmitterABI)
go e.consumeStream(ctx, logger, processingQueue, messageSent, messageTransmitterABI, stream, sig)
consumeHistory(logger, history, processingQueue, messageSent, messageTransmitterABI)

// get history from (start block - lookback) up until latest block
latestBlock := e.LatestBlock()
start := latestBlock
if e.startBlock != 0 {
start = e.startBlock
}
startLookback := start - e.lookbackPeriod
logger.Info(fmt.Sprintf("Getting history from %d: starting at: %d looking back %d blocks", startLookback, start, e.lookbackPeriod))
e.getAndConsumeHistory(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, startLookback, latestBlock)
// get history from (start block - lookback) up until latest block
latestBlock := e.LatestBlock()
start := latestBlock
if e.startBlock != 0 {
start = e.startBlock
}
startLookback := start - e.lookbackPeriod

logger.Info("Finished getting history")
logger.Info(fmt.Sprintf("Getting history from %d: starting at: %d looking back %d blocks", startLookback, start, e.lookbackPeriod))
e.getAndConsumeHistory(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, startLookback, latestBlock)
logger.Info("Finished getting history")

if flushInterval > 0 {
go e.flushMechanism(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, flushInterval, sig)
}
if flushInterval > 0 {
go e.flushMechanism(ctx, logger, processingQueue, messageSent, messageTransmitterAddress, messageTransmitterABI, flushOnlyMode, flushInterval, sig)
}

// listen for errors in the main websocket stream
// if error occurs, trigger sig.Ready
// This will cancel `consumeStream` and `flushMechanism` routines
select {
case <-ctx.Done():
return
case err := <-sub.Err():
logger.Error("Websocket disconnected. Reconnecting...", "err", err)
close(sig.Ready)

// restart
e.startBlock = e.lastFlushedBlock
time.Sleep(10 * time.Millisecond)
e.StartListener(ctx, logger, processingQueue, flushInterval)
return
// listen for errors in the main websocket stream
// if error occurs, trigger sig.Ready
// This will cancel `consumeStream` and `flushMechanism` routines
select {
case <-ctx.Done():
return
case err := <-sub.Err():
logger.Error("Websocket disconnected. Reconnecting...", "err", err)
close(sig.Ready)

// restart
e.startBlock = e.lastFlushedBlock
time.Sleep(10 * time.Millisecond)
e.StartListener(ctx, logger, processingQueue, flushOnlyMode, flushInterval)
return
}
}
}

Expand Down Expand Up @@ -278,11 +284,19 @@ func (e *Ethereum) flushMechanism(
messageSent abi.Event,
messageTransmitterAddress common.Address,
messageTransmitterABI abi.ABI,
flushOnlyMode bool,
flushInterval time.Duration,
sig *errSignal,
) {
logger.Info(fmt.Sprintf("Starting flush mechanism. Will flush every %v", flushInterval))

// extraFlushBlocks is used to add an extra space between latest height and last flushed block
// this setting should only be used for the secondary, flush only relayer
extraFlushBlocks := uint64(0)
if flushOnlyMode {
extraFlushBlocks = 2 * e.lookbackPeriod
}

for {
timer := time.NewTimer(flushInterval)
select {
Expand All @@ -291,7 +305,7 @@ func (e *Ethereum) flushMechanism(

// initialize first lastFlushedBlock if not set
if e.lastFlushedBlock == 0 {
e.lastFlushedBlock = latestBlock - 2*e.lookbackPeriod
e.lastFlushedBlock = latestBlock - (2*e.lookbackPeriod + extraFlushBlocks)

if latestBlock < e.lookbackPeriod {
e.lastFlushedBlock = 0
Expand All @@ -302,7 +316,7 @@ func (e *Ethereum) flushMechanism(
startBlock := e.lastFlushedBlock

// set finish block to be latestBlock - lookbackPeriod
finishBlock := latestBlock - e.lookbackPeriod
finishBlock := latestBlock - (e.lookbackPeriod + extraFlushBlocks)

if startBlock >= finishBlock {
logger.Debug("No new blocks to flush")
Expand Down
2 changes: 1 addition & 1 deletion ethereum/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestStartListener(t *testing.T) {

processingQueue := make(chan *types.TxState, 10000)

go eth.StartListener(ctx, a.Logger, processingQueue, 0)
go eth.StartListener(ctx, a.Logger, processingQueue, false, 0)

time.Sleep(5 * time.Second)

Expand Down
2 changes: 1 addition & 1 deletion integration/eth_burn_to_noble_mint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestEthBurnToNobleMint(t *testing.T) {

processingQueue := make(chan *types.TxState, 10)

go ethChain.StartListener(ctx, a.Logger, processingQueue, 0)
go ethChain.StartListener(ctx, a.Logger, processingQueue, false, 0)
go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap, nil)

_, _, generatedWallet := testdata.KeyTestPubAddr()
Expand Down
2 changes: 1 addition & 1 deletion integration/noble_burn_to_eth_mint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func TestNobleBurnToEthMint(t *testing.T) {

processingQueue := make(chan *types.TxState, 10)

go nobleChain.StartListener(ctx, a.Logger, processingQueue, 0)
go nobleChain.StartListener(ctx, a.Logger, processingQueue, false, 0)
go cmd.StartProcessor(ctx, a, registeredDomains, processingQueue, sequenceMap, nil)

ethDestinationAddress, _, err := generateEthWallet()
Expand Down
Loading
Loading