diff --git a/internal/cache/cache.go b/internal/cache/cache.go index 7a16f9ac4..25d42b98a 100644 --- a/internal/cache/cache.go +++ b/internal/cache/cache.go @@ -6,15 +6,16 @@ import ( ) var ( - ErrCacheNotFound = errors.New("key not found in cache") - ErrCacheFailedToSet = errors.New("failed to set value in cache") - ErrCacheFailedToDel = errors.New("failed to delete value from cache") - ErrCacheFailedToGet = errors.New("failed to get value from cache") - ErrCacheFailedToMarshalValue = errors.New("failed to marshal value") + ErrCacheNotFound = errors.New("key not found in cache") + ErrCacheFailedToSet = errors.New("failed to set value in cache") + ErrCacheFailedToDel = errors.New("failed to delete value from cache") + ErrCacheFailedToGet = errors.New("failed to get value from cache") + ErrCacheFailedToScan = errors.New("failed to scan cache") ) type Store interface { Get(key string) ([]byte, error) + GetAllWithPrefix(prefix string) (map[string][]byte, error) Set(key string, value []byte, ttl time.Duration) error - Del(key string) error + Del(keys ...string) error } diff --git a/internal/cache/in_memory.go b/internal/cache/in_memory.go index 9d5c327e7..35f45a78a 100644 --- a/internal/cache/in_memory.go +++ b/internal/cache/in_memory.go @@ -32,7 +32,20 @@ func (s *MemoryStore) Set(key string, value []byte, _ time.Duration) error { } // Del removes a key from the store. -func (s *MemoryStore) Del(key string) error { - s.data.Delete(key) +func (s *MemoryStore) Del(keys ...string) error { + for _, k := range keys { + s.data.Delete(k) + } return nil } +func (s *MemoryStore) GetAllWithPrefix(prefix string) (map[string][]byte, error) { + keys := make(map[string][]byte) + s.data.Range(func(k, v interface{}) bool { + key := k.(string) + if key[:len(prefix)] == prefix { + keys[key] = v.([]byte) + } + return true + }) + return keys, nil +} diff --git a/internal/cache/redis.go b/internal/cache/redis.go index 932f4ace0..eb5832b2c 100644 --- a/internal/cache/redis.go +++ b/internal/cache/redis.go @@ -43,8 +43,8 @@ func (r *RedisStore) Set(key string, value []byte, ttl time.Duration) error { } // Del removes a value by key. -func (r *RedisStore) Del(key string) error { - result, err := r.client.Del(r.ctx, key).Result() +func (r *RedisStore) Del(keys ...string) error { + result, err := r.client.Del(r.ctx, keys...).Result() if err != nil { return errors.Join(ErrCacheFailedToDel, err) } @@ -53,3 +53,33 @@ func (r *RedisStore) Del(key string) error { } return nil } + +// GetAllWithPrefix retrieves all key-value pairs that match a specific prefix. +func (r *RedisStore) GetAllWithPrefix(prefix string) (map[string][]byte, error) { + var cursor uint64 + results := make(map[string][]byte) + + for { + keys, newCursor, err := r.client.Scan(r.ctx, cursor, prefix+"*", 10).Result() + if err != nil { + return nil, errors.Join(ErrCacheFailedToScan, err) + } + + for _, key := range keys { + value, err := r.client.Get(r.ctx, key).Result() + if errors.Is(err, redis.Nil) { + // Key has been removed between SCAN and GET, skip it + continue + } else if err != nil { + return nil, errors.Join(ErrCacheFailedToGet, err) + } + results[key] = []byte(value) + } + + cursor = newCursor + if cursor == 0 { + break + } + } + return results, nil +} diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index 84e1ff394..323e00e83 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -414,21 +414,18 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() { return case statusUpdate := <-p.storageStatusUpdateCh: // Ensure no duplicate statuses - actualUpdateStatusMap, err := p.updateStatusMap(statusUpdate) + err := p.updateStatusMap(statusUpdate) if err != nil { p.logger.Error("failed to update status", slog.String("err", err.Error())) return } - - if len(actualUpdateStatusMap) >= p.processStatusUpdatesBatchSize { - p.checkAndUpdate(ctx, actualUpdateStatusMap) - - // Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed. - // This prevents unnecessary immediate updates and maintains the intended time interval between batches. - ticker.Reset(p.processStatusUpdatesInterval) - } case <-ticker.C: - statusUpdatesMap := p.getStatusUpdateMap() + statusUpdatesMap, err := p.getAllTransactionStatuses() + if err != nil { + p.logger.Error("failed to get all transaction statuses", slog.String("err", err.Error())) + return + } + if len(statusUpdatesMap) > 0 { p.checkAndUpdate(ctx, statusUpdatesMap) @@ -465,7 +462,7 @@ func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap StatusU p.logger.Error("failed to bulk update statuses", slog.String("err", err.Error())) } - err = p.cacheStore.Del(CacheStatusUpdateKey) + err = p.clearCache(statusUpdatesMap) if err != nil { p.logger.Error("failed to clear status update map", slog.String("err", err.Error())) } diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index e95bebe09..2b47910a2 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -6,12 +6,16 @@ import ( "github.com/libsv/go-p2p/chaincfg/chainhash" + "github.com/bitcoin-sv/arc/internal/cache" "github.com/bitcoin-sv/arc/internal/metamorph/store" ) type StatusUpdateMap map[chainhash.Hash]store.UpdateStatus -const CacheStatusUpdateKey = "status-updates" +const ( + CacheStatusUpdateKey = "status-updates" + CacheTxPrefix = "tx-" +) var ( ErrFailedToSerialize = errors.New("failed to serialize value") @@ -22,51 +26,86 @@ func (p *Processor) GetProcessorMapSize() int { return p.responseProcessor.getMapLen() } -func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) (StatusUpdateMap, error) { - statusUpdatesMap := p.getStatusUpdateMap() - foundStatusUpdate, found := statusUpdatesMap[statusUpdate.Hash] +func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) error { + currentStatusUpdate, err := p.getTransactionStatus(statusUpdate.Hash) + if err != nil { + if errors.Is(err, cache.ErrCacheNotFound) { + // if record doesn't exist, save new one + return p.setTransactionStatus(statusUpdate) + } + return err + } - if !found || shouldUpdateStatus(statusUpdate, foundStatusUpdate) { + if shouldUpdateStatus(statusUpdate, *currentStatusUpdate) { if len(statusUpdate.CompetingTxs) > 0 { - statusUpdate.CompetingTxs = mergeUnique(statusUpdate.CompetingTxs, foundStatusUpdate.CompetingTxs) + statusUpdate.CompetingTxs = mergeUnique(statusUpdate.CompetingTxs, currentStatusUpdate.CompetingTxs) } - - statusUpdatesMap[statusUpdate.Hash] = statusUpdate + // TODO: combine status history } - err := p.setStatusUpdateMap(statusUpdatesMap) + return p.setTransactionStatus(statusUpdate) +} + +func (p *Processor) setTransactionStatus(status store.UpdateStatus) error { + bytes, err := json.Marshal(status) if err != nil { - return statusUpdatesMap, err + return errors.Join(ErrFailedToSerialize, err) } - return statusUpdatesMap, nil + err = p.cacheStore.Set(CacheTxPrefix+status.Hash.String(), bytes, processStatusUpdatesIntervalDefault) + if err != nil { + return err + } + return nil } -func (p *Processor) setStatusUpdateMap(statusUpdatesMap StatusUpdateMap) error { - bytes, err := serializeStatusMap(statusUpdatesMap) +func (p *Processor) getTransactionStatus(hash chainhash.Hash) (*store.UpdateStatus, error) { + bytes, err := p.cacheStore.Get(CacheTxPrefix + hash.String()) if err != nil { - return err + return nil, err } - err = p.cacheStore.Set(CacheStatusUpdateKey, bytes, processStatusUpdatesIntervalDefault) + var status store.UpdateStatus + err = json.Unmarshal(bytes, &status) if err != nil { - return err + return nil, err } - return nil + + return &status, nil } -func (p *Processor) getStatusUpdateMap() StatusUpdateMap { - existingMap, err := p.cacheStore.Get(CacheStatusUpdateKey) +func (p *Processor) getAllTransactionStatuses() (StatusUpdateMap, error) { + statuses := make(StatusUpdateMap) + keys, err := p.cacheStore.GetAllWithPrefix(CacheTxPrefix) + if err != nil { + return nil, err + } - if err == nil { - statusUpdatesMap, err := deserializeStatusMap(existingMap) - if err == nil { - return statusUpdatesMap + for key, value := range keys { + hash, err := chainhash.NewHashFromStr(key[len(CacheTxPrefix):]) + if err != nil { + return nil, err + } + + var status store.UpdateStatus + err = json.Unmarshal(value, &status) + if err != nil { + return nil, err } + + statuses[*hash] = status + } + + return statuses, nil +} + +func (p *Processor) clearCache(updateStatusMap StatusUpdateMap) error { + keys := make([]string, len(updateStatusMap)) + for k, _ := range updateStatusMap { + keys = append(keys, CacheTxPrefix+k.String()) } - // If the key doesn't exist or there was an error unmarshalling the value return new map - return make(StatusUpdateMap) + return p.cacheStore.Del(keys...) } func shouldUpdateStatus(new, found store.UpdateStatus) bool { @@ -122,36 +161,3 @@ func mergeUnique(arr1, arr2 []string) []string { return uniqueSlice } - -func serializeStatusMap(updateStatusMap StatusUpdateMap) ([]byte, error) { - serializeMap := make(map[string]store.UpdateStatus) - for k, v := range updateStatusMap { - serializeMap[k.String()] = v - } - - bytes, err := json.Marshal(serializeMap) - if err != nil { - return nil, errors.Join(ErrFailedToSerialize, err) - } - return bytes, nil -} - -func deserializeStatusMap(data []byte) (StatusUpdateMap, error) { - serializeMap := make(map[string]store.UpdateStatus) - updateStatusMap := make(StatusUpdateMap) - - err := json.Unmarshal(data, &serializeMap) - if err != nil { - return nil, errors.Join(ErrFailedToDeserialize, err) - } - - for k, v := range serializeMap { - hash, err := chainhash.NewHashFromStr(k) - if err != nil { - return nil, errors.Join(ErrFailedToDeserialize, err) - } - updateStatusMap[*hash] = v - } - - return updateStatusMap, nil -}