Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor listener; add flush #58

Merged
merged 15 commits into from
Mar 21, 2024
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@ Running the relayer
noble-cctp-relayer start --config ./config/sample-app-config.yaml
```
Sample configs can be found in [config](config).
### Promethius Metrics

### Flush Interval

Using the `--flush-interval` flag will run a flush on all paths every `duration`; ex `--flush-interval 5m`

The relayer will keep track of the latest flushed block. The first time the flush is run, the flush will start at the chains latest height - lookback period and flush up until height of the chain when the flush started. It will then store the height the flush ended on.

After that, it will flush from the last stored height - lookback period up until the latest height of the chain.

### Prometheus Metrics

By default, metrics are exported at on port :2112/metrics (`http://localhost:2112/metrics`). You can customize the port using the `--metrics-port` flag.

Expand Down
4 changes: 2 additions & 2 deletions cmd/appstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@ func (a *AppState) loadConfigFile() {
}
config, err := ParseConfig(a.ConfigPath)
if err != nil {
a.Logger.Error("unable to parse config file", "location", a.ConfigPath, "err", err)
a.Logger.Error("Unable to parse config file", "location", a.ConfigPath, "err", err)
os.Exit(1)
}
a.Logger.Info("successfully parsed config file", "location", a.ConfigPath)
a.Logger.Info("Successfully parsed config file", "location", a.ConfigPath)
a.Config = config

}
12 changes: 7 additions & 5 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@ import (
)

const (
flagConfigPath = "config"
flagVerbose = "verbose"
flagLogLevel = "log-level"
flagJSON = "json"
flagMetricsPort = "metrics-port"
flagConfigPath = "config"
flagVerbose = "verbose"
flagLogLevel = "log-level"
flagJSON = "json"
flagMetricsPort = "metrics-port"
flagFlushInterval = "flush-interval"
)

func addAppPersistantFlags(cmd *cobra.Command, a *AppState) *cobra.Command {
cmd.PersistentFlags().StringVar(&a.ConfigPath, flagConfigPath, defaultConfigPath, "file path of config file")
cmd.PersistentFlags().BoolVarP(&a.Debug, flagVerbose, "v", false, fmt.Sprintf("use this flag to set log level to `debug` (overrides %s flag)", flagLogLevel))
cmd.PersistentFlags().StringVar(&a.LogLevel, flagLogLevel, "info", "log level (debug, info, warn, error)")
cmd.PersistentFlags().Int16P(flagMetricsPort, "p", 2112, "customize Prometheus metrics port")
cmd.PersistentFlags().DurationP(flagFlushInterval, "i", 0, "how frequently should a flush routine be run")
return cmd

}
Expand Down
30 changes: 29 additions & 1 deletion cmd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ func Start(a *AppState) *cobra.Command {
os.Exit(1)
}

flushInterval, err := cmd.Flags().GetDuration(flagFlushInterval)
if err != nil {
logger.Error("invalid flush interval", "error", err)
}
if flushInterval == 0 {
logger.Info("flush interval not set. Use the --flush-interval flag to set a reoccurring flush")
}

metrics := relayer.InitPromMetrics(port)

for name, cfg := range cfg.Chains {
Expand All @@ -63,12 +71,26 @@ func Start(a *AppState) *cobra.Command {
os.Exit(1)
}

logger = logger.With("name", c.Name(), "domain", c.Domain())

if err := c.InitializeClients(cmd.Context(), logger); err != nil {
logger.Error("error initializing client", "err", err)
os.Exit(1)
}

go c.TrackLatestBlockHeight(cmd.Context(), logger)

// wait until height is available
for c.LatestBlock() == 0 {
time.Sleep(1 * time.Second)
boojamya marked this conversation as resolved.
Show resolved Hide resolved
}

if err := c.InitializeBroadcaster(cmd.Context(), logger, sequenceMap); err != nil {
logger.Error("Error initializing broadcaster", "error", err)
os.Exit(1)
}

go c.StartListener(cmd.Context(), logger, processingQueue)
go c.StartListener(cmd.Context(), logger, processingQueue, flushInterval)
go c.WalletBalanceMetric(cmd.Context(), a.Logger, metrics)

if _, ok := registeredDomains[c.Domain()]; ok {
Expand All @@ -85,6 +107,12 @@ func Start(a *AppState) *cobra.Command {
}

<-cmd.Context().Done()
// clean up
time.Sleep(20 * time.Millisecond)
for _, c := range registeredDomains {
fmt.Printf("\n%s: latest-block: %d last-flushed-block: %d", c.Name(), c.LatestBlock(), c.LastFlushedBlock())
c.CloseClients()
}
boojamya marked this conversation as resolved.
Show resolved Hide resolved
},
}

Expand Down
12 changes: 3 additions & 9 deletions ethereum/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/strangelove-ventures/noble-cctp-relayer/ethereum/contracts"
"github.com/strangelove-ventures/noble-cctp-relayer/types"
)
Expand All @@ -40,14 +39,9 @@ func (e *Ethereum) Broadcast(
sequenceMap *types.SequenceMap,
) error {

// set up eth client
client, err := ethclient.Dial(e.rpcURL)
if err != nil {
return fmt.Errorf("unable to dial ethereum client: %w", err)
}
defer client.Close()
logger = logger.With("chain", e.name, "chain_id", e.chainID, "domain", e.domain)

backend := NewContractBackendWrapper(client)
backend := NewContractBackendWrapper(e.rpcClient)

auth, err := bind.NewKeyedTransactorWithChainID(e.privateKey, big.NewInt(e.chainID))
if err != nil {
Expand Down Expand Up @@ -149,7 +143,7 @@ func (e *Ethereum) attemptBroadcast(
if response.Uint64() == uint64(1) {
// nonce has already been used, mark as complete
logger.Debug(fmt.Sprintf("This source domain/nonce has already been used: %d %d",
msg.SourceDomain, msg.Nonce))
msg.SourceDomain, msg.Nonce), "src-tx", msg.SourceTxHash, "reviever")
msg.Status = types.Complete
return nil
}
Expand Down
53 changes: 53 additions & 0 deletions ethereum/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package ethereum

import (
"bytes"
"context"
"crypto/ecdsa"
"embed"
"encoding/hex"
"fmt"
"strings"
"sync"

"cosmossdk.io/log"

"github.com/ethereum/go-ethereum/ethclient"
"github.com/strangelove-ventures/noble-cctp-relayer/types"
)

Expand All @@ -17,6 +22,7 @@ var content embed.FS
var _ types.Chain = (*Ethereum)(nil)

type Ethereum struct {
// from conifg
name string
chainID int64
domain types.Domain
Expand All @@ -34,6 +40,12 @@ type Ethereum struct {
MetricsExponent int

mu sync.Mutex

wsClient *ethclient.Client
rpcClient *ethclient.Client

latestBlock uint64
lastFlushedBlock uint64
}

func NewChain(
Expand Down Expand Up @@ -83,6 +95,23 @@ func (e *Ethereum) Domain() types.Domain {
return e.domain
}

func (e *Ethereum) LatestBlock() uint64 {
e.mu.Lock()
block := e.latestBlock
e.mu.Unlock()
return block
}

func (e *Ethereum) SetLatestBlock(block uint64) {
e.mu.Lock()
e.latestBlock = block
e.mu.Unlock()
}

func (e *Ethereum) LastFlushedBlock() uint64 {
return e.lastFlushedBlock
}

func (e *Ethereum) IsDestinationCaller(destinationCaller []byte) bool {
zeroByteArr := make([]byte, 32)

Expand All @@ -96,3 +125,27 @@ func (e *Ethereum) IsDestinationCaller(destinationCaller []byte) bool {

return bytes.Equal(destinationCaller, zeroByteArr) || bytes.Equal(destinationCaller, decodedMinterPadded)
}

func (e *Ethereum) InitializeClients(ctx context.Context, logger log.Logger) error {
var err error

e.wsClient, err = ethclient.DialContext(ctx, e.wsURL)
if err != nil {
return fmt.Errorf("unable to initialize websocket ethereum client; err: %w", err)
}

e.rpcClient, err = ethclient.DialContext(ctx, e.rpcURL)
if err != nil {
return fmt.Errorf("unable to initialize rpc ethereum client; err: %w", err)
}
return nil
}

func (e *Ethereum) CloseClients() {
if e.wsClient != nil {
e.wsClient.Close()
}
if e.rpcClient != nil {
e.rpcClient.Close()
}
}
Loading
Loading