diff --git a/cmd/process.go b/cmd/process.go index 23f0451..ae7a3f9 100644 --- a/cmd/process.go +++ b/cmd/process.go @@ -170,40 +170,35 @@ func StartProcessor( // if the message is burned or pending, check for an attestation if msg.Status == types.Created || msg.Status == types.Pending { response := circle.CheckAttestation(cfg.Circle.AttestationBaseUrl, logger, msg.IrisLookupId, msg.SourceTxHash, msg.SourceDomain, msg.DestDomain) - if response != nil { - if msg.Status == types.Created && response.Status == "pending_confirmations" { - logger.Debug("Attestation is created but still pending confirmations for 0x" + msg.IrisLookupId + ". Retrying...") - State.Mu.Lock() - msg.Status = types.Pending - msg.Updated = time.Now() - State.Mu.Unlock() - time.Sleep(10 * time.Second) - requeue = true - continue - } else if response.Status == "pending_confirmations" { - logger.Debug("Attestation is still pending for 0x" + msg.IrisLookupId + ". Retrying...") - time.Sleep(10 * time.Second) - requeue = true - continue - } else if response.Status == "complete" { - logger.Debug("Attestation is complete for 0x" + msg.IrisLookupId + ". Retrying...") - State.Mu.Lock() - msg.Status = types.Attested - msg.Attestation = response.Attestation - msg.Updated = time.Now() - broadcastMsgs[msg.DestDomain] = append(broadcastMsgs[msg.DestDomain], msg) - State.Mu.Unlock() - } - } else { - // add attestation retry intervals per domain here + + if response == nil { logger.Debug("Attestation is still processing for 0x" + msg.IrisLookupId + ". Retrying...") - time.Sleep(10 * time.Second) - // retry requeue = true continue + } else if msg.Status == types.Created && response.Status == "pending_confirmations" { + logger.Debug("Attestation is created but still pending confirmations for 0x" + msg.IrisLookupId + ". Retrying...") + State.Mu.Lock() + msg.Status = types.Pending + msg.Updated = time.Now() + State.Mu.Unlock() + requeue = true + continue + } else if response.Status == "pending_confirmations" { + logger.Debug("Attestation is still pending for 0x" + msg.IrisLookupId + ". Retrying...") + requeue = true + continue + } else if response.Status == "complete" { + logger.Debug("Attestation is complete for 0x" + msg.IrisLookupId + ".") + State.Mu.Lock() + msg.Status = types.Attested + msg.Attestation = response.Attestation + msg.Updated = time.Now() + broadcastMsgs[msg.DestDomain] = append(broadcastMsgs[msg.DestDomain], msg) + State.Mu.Unlock() } } } + // if the message is attested to, try to broadcast for domain, msgs := range broadcastMsgs { chain, ok := registeredDomains[domain] @@ -224,10 +219,17 @@ func StartProcessor( msg.Updated = time.Now() } State.Mu.Unlock() - } + + // requeue txs, ensure not to exceed retry limit if requeue { - processingQueue <- tx + if dequeuedTx.RetryAttempt < cfg.Circle.FetchRetries { + dequeuedTx.RetryAttempt++ + time.Sleep(time.Duration(cfg.Circle.FetchRetryInterval) * time.Second) + processingQueue <- tx + } else { + logger.Error("Retry limit exceeded for tx", "limit", cfg.Circle.FetchRetries, "tx", dequeuedTx.TxHash) + } } } } diff --git a/types/message_state.go b/types/message_state.go index 27c665e..55fbd76 100644 --- a/types/message_state.go +++ b/types/message_state.go @@ -27,8 +27,9 @@ const ( type Domain uint32 type TxState struct { - TxHash string - Msgs []*MessageState + TxHash string + Msgs []*MessageState + RetryAttempt int } type MessageState struct {