Skip to content

Commit

Permalink
update error sig
Browse files Browse the repository at this point in the history
  • Loading branch information
boojamya committed Mar 18, 2024
1 parent 23d84ff commit a591caf
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 64 deletions.
1 change: 1 addition & 0 deletions cmd/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func Start(a *AppState) *cobra.Command {

<-cmd.Context().Done()
// clean up
time.Sleep(20 * time.Millisecond)
for _, c := range registeredDomains {
fmt.Printf("\n%s: latest-block: %d last-flushed-block: %d", c.Name(), c.LatestBlock(), c.LastFlushedBlock())
c.CloseClients()
Expand Down
2 changes: 1 addition & 1 deletion ethereum/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (e *Ethereum) attemptBroadcast(
if response.Uint64() == uint64(1) {
// nonce has already been used, mark as complete
logger.Debug(fmt.Sprintf("This source domain/nonce has already been used: %d %d",
msg.SourceDomain, msg.Nonce))
msg.SourceDomain, msg.Nonce), "src-tx", msg.SourceTxHash, "reviever")
msg.Status = types.Complete
return nil
}
Expand Down
112 changes: 52 additions & 60 deletions ethereum/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"math/big"
"os"
"sync"
"time"

"cosmossdk.io/log"
Expand All @@ -24,29 +23,15 @@ var (
messageTransmitterABI abi.ABI
messageSent abi.Event
messageTransmitterAddress common.Address
processingQueue chan *types.TxState
)

type errorSync struct {
sync.Mutex
trigger chan struct{}
err error
next *errorSync
}

func NewErrorSync() *errorSync {
return &errorSync{
trigger: make(chan struct{}),
}
}
// errSignal allows broadcasting an error value to multiple receivers.
type errSignal struct {
Ready chan struct{}

func (es *errorSync) SetError(err error) {
es.Lock()
defer es.Unlock()
if es.err != nil {
return
}
es.err = err
close(es.trigger)
Err error
Next *errSignal
}

func (e *Ethereum) StartListener(
Expand All @@ -70,44 +55,59 @@ func (e *Ethereum) StartListener(
messageSent = messageTransmitterABI.Events["MessageSent"]
messageTransmitterAddress = common.HexToAddress(e.messageTransmitterAddress)

e.startListenerRoutines(ctx, logger, processingQueue)

e.startListenerRoutines(ctx, logger)
}

// startListenerRoutines starts the ethereum websocket subscription, queries history pertaining to the lookback period,
// and starts the reoccurring flush

// If an error occurs in websocket strea, this function will handle relevant sub routines and then re-run iteself.
func (e *Ethereum) startListenerRoutines(
ctx context.Context,
logger log.Logger,
processingQueue chan *types.TxState,
) {

sig := &errSignal{
Ready: make(chan struct{}),
}

// start main stream (does not account for lookback period or specific start block)
stream, sub, history := e.startMainStream(ctx, logger)
ethSubErrorSync := NewErrorSync()

go func() {
err := <-sub.Err()
logger.Error("websocket disconnected. Will re-connect", "err", err)
ethSubErrorSync.SetError(err)
}()
go e.consumeStream(ctx, logger, stream, sig)
consumeHistroy(logger, history)

go e.consumeStream(ctx, logger, processingQueue, stream, ethSubErrorSync)
consumeHistroy(logger, history, processingQueue)

// get history for start block and lookback period
// get history from start-lookback up until latest block
latestBlock := e.LatestBlock()
start := latestBlock
if e.startBlock != 0 {
start = e.startBlock
}
start -= e.lookbackPeriod
logger.Info(fmt.Sprintf("getting history: start block:%d looking back %d blocks", e.startBlock, e.lookbackPeriod))
e.getAndConsumeHistory(ctx, logger, processingQueue, start, latestBlock)
startLookback := start - e.lookbackPeriod
logger.Info(fmt.Sprintf("getting history from %d: starting at:%d and looking back %d blocks", startLookback, start, e.lookbackPeriod))
e.getAndConsumeHistory(ctx, logger, startLookback, latestBlock)

logger.Info("finished getting history")

// start flush timer
go e.flushMechanism(ctx, logger, processingQueue, ethSubErrorSync)
go e.flushMechanism(ctx, logger, sig)

// listen for errors in the main websocket stream
// if error occurs, trigger sig.Ready
// This will cancel `consumeStream` and `flushMechanism` routines
select {
case <-ctx.Done():
return
case err := <-sub.Err():
logger.Error("websocket disconnected. Reconnecting...", "err", err)
close(sig.Ready)

// restart
e.startBlock = e.lastFlushedBlock
time.Sleep(10 * time.Millisecond)
e.startListenerRoutines(ctx, logger)
return
}

}

func (e *Ethereum) startMainStream(
Expand Down Expand Up @@ -150,7 +150,6 @@ func (e *Ethereum) startMainStream(
func (e *Ethereum) getAndConsumeHistory(
ctx context.Context,
logger log.Logger,
processingQueue chan *types.TxState,
start, end uint64) {

var toUnSub ethereum.Subscription
Expand Down Expand Up @@ -194,7 +193,7 @@ func (e *Ethereum) getAndConsumeHistory(
break
}
toUnSub.Unsubscribe()
consumeHistroy(logger, history, processingQueue)
consumeHistroy(logger, history)

start += chunkSize
chunk++
Expand All @@ -206,7 +205,6 @@ func (e *Ethereum) getAndConsumeHistory(
func consumeHistroy(
logger log.Logger,
history []ethtypes.Log,
processingQueue chan *types.TxState,
) {
for _, historicalLog := range history {
parsedMsg, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &historicalLog)
Expand All @@ -225,22 +223,17 @@ func consumeHistroy(
func (e *Ethereum) consumeStream(
ctx context.Context,
logger log.Logger,
processingQueue chan *types.TxState,
stream <-chan ethtypes.Log,
errSync *errorSync,
sig *errSignal,
) {
logger.Debug("consuming incoming messages")
var txState *types.TxState
for {
select {
case <-ctx.Done():
return
case <-errSync.trigger:
// setting start block to 0 will start listener from latsest height.
// in the rare case we are waiting for the websocket to come back on line for a long period of time,
// we'll rely on the latestFlushBlock to flush out all missted transactions
e.startBlock = 0
logger.Error("stream routine stopped")
e.startListenerRoutines(ctx, logger, processingQueue)
case <-sig.Ready:
logger.Debug("websocket disconnected...stopped consuming stream")
return
case streamLog := <-stream:
parsedMsg, err := types.EvmLogToMessageState(messageTransmitterABI, messageSent, &streamLog)
Expand Down Expand Up @@ -270,8 +263,7 @@ func (e *Ethereum) consumeStream(
func (e *Ethereum) flushMechanism(
ctx context.Context,
logger log.Logger,
processingQueue chan *types.TxState,
errSync *errorSync,
sig *errSignal,
) {

for {
Expand All @@ -288,16 +280,16 @@ func (e *Ethereum) flushMechanism(

logger.Info(fmt.Sprintf("flush started from %d to %d", start, latestBlock))

e.getAndConsumeHistory(ctx, logger, processingQueue, start, latestBlock)
e.getAndConsumeHistory(ctx, logger, start, latestBlock)

e.lastFlushedBlock = latestBlock

logger.Info("flush complete")

// if main websocket stream is disconnected, stop flush. It will be restarted once websocket is reconnected
case <-errSync.trigger:
case <-sig.Ready:
timer.Stop()
logger.Info("flush stopped. Will restart after websocket is re-established")
logger.Debug("websocket disconnected. Flush stopped. Will restart after websocket is re-established")
return
case <-ctx.Done():
timer.Stop()
Expand All @@ -312,7 +304,7 @@ func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger
// first time
header, err := e.rpcClient.HeaderByNumber(ctx, nil)
if err != nil {
logger.Error("Error getting lastest block height:", err)
logger.Error(fmt.Sprintf("error getting lastest block height. Will retry in %.2f second:", loop.Seconds()), "err", err)
}
if err == nil {
e.SetLatestBlock(header.Number.Uint64())
Expand All @@ -325,7 +317,7 @@ func (e *Ethereum) TrackLatestBlockHeight(ctx context.Context, logger log.Logger
case <-timer.C:
header, err := e.rpcClient.HeaderByNumber(ctx, nil)
if err != nil {
logger.Error("Error getting lastest block height:", err)
logger.Debug(fmt.Sprintf("error getting lastest block height. Will retry in %.2f second:", loop.Seconds()), "err", err)
continue
}
e.SetLatestBlock(header.Number.Uint64())
Expand Down Expand Up @@ -356,7 +348,7 @@ func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m
timer.Stop()
balance, err := e.rpcClient.BalanceAt(ctx, account, nil)
if err != nil {
logger.Error(fmt.Sprintf("error querying balance. Will try again in %d sec", queryRate), "error", err)
logger.Error(fmt.Sprintf("error querying balance. Will try again in %.2f sec", queryRate.Seconds()), "error", err)
continue
}

Expand All @@ -367,7 +359,7 @@ func (e *Ethereum) WalletBalanceMetric(ctx context.Context, logger log.Logger, m
case <-timer.C:
balance, err := e.rpcClient.BalanceAt(ctx, account, nil)
if err != nil {
logger.Error(fmt.Sprintf("error querying balance. Will try again in %d sec", queryRate), "error", err)
logger.Error(fmt.Sprintf("error querying balance. Will try again in %.2f sec", queryRate.Seconds()), "error", err)
continue
}

Expand Down
9 changes: 7 additions & 2 deletions noble/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (n *Noble) Broadcast(
}

// Log retry information
logger.Error("Broadcasting to noble failed. Retrying...", "error", err, "interval_seconds", n.retryIntervalSeconds)
logger.Error(fmt.Sprintf("Broadcasting to noble failed. Attempt %d/%d Retrying...", attempt, n.maxRetries), "error", err, "interval_seconds", n.retryIntervalSeconds, "src-tx", msgs[0].SourceTxHash)
time.Sleep(time.Duration(n.retryIntervalSeconds) * time.Second)
}

Expand Down Expand Up @@ -99,7 +99,12 @@ func (n *Noble) attemptBroadcast(

if used {
msg.Status = types.Complete
logger.Info(fmt.Sprintf("Noble cctp minter nonce %d already used", msg.Nonce))
// bm, _ := new(cctptypes.BurnMessage).Parse(msg.MsgBody)
// x, err := hex.DecodeString(string(bm.MintRecipient))
// fmt.Println("err", err)
// y := common.HexToAddress(string(x))
// fmt.Println("ERRR", err)
logger.Info(fmt.Sprintf("Noble cctp minter nonce %d already used.", msg.Nonce), "src-tx", msg.SourceTxHash)
continue
}

Expand Down
2 changes: 1 addition & 1 deletion noble/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (n *Noble) StartListener(
block := <-blockQueue
res, err := n.cc.RPCClient.TxSearch(ctx, fmt.Sprintf("tx.height=%d", block), false, nil, nil, "")
if err != nil || res == nil {
logger.Debug(fmt.Sprintf("unable to query Noble block %d", block), "error:", err)
logger.Debug(fmt.Sprintf("unable to query Noble block %d. Will retry.", block), "error:", err)
blockQueue <- block
continue
}
Expand Down

0 comments on commit a591caf

Please sign in to comment.