Skip to content

Commit

Permalink
feat(ARCO-283): save separate transactions in cache
Browse files Browse the repository at this point in the history
  • Loading branch information
pawellewandowski98 committed Nov 14, 2024
1 parent 2b8f479 commit b3cddd5
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 79 deletions.
13 changes: 7 additions & 6 deletions internal/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import (
)

var (
ErrCacheNotFound = errors.New("key not found in cache")
ErrCacheFailedToSet = errors.New("failed to set value in cache")
ErrCacheFailedToDel = errors.New("failed to delete value from cache")
ErrCacheFailedToGet = errors.New("failed to get value from cache")
ErrCacheFailedToMarshalValue = errors.New("failed to marshal value")
ErrCacheNotFound = errors.New("key not found in cache")
ErrCacheFailedToSet = errors.New("failed to set value in cache")
ErrCacheFailedToDel = errors.New("failed to delete value from cache")
ErrCacheFailedToGet = errors.New("failed to get value from cache")
ErrCacheFailedToScan = errors.New("failed to scan cache")
)

type Store interface {
Get(key string) ([]byte, error)
GetAllWithPrefix(prefix string) (map[string][]byte, error)
Set(key string, value []byte, ttl time.Duration) error
Del(key string) error
Del(keys ...string) error
}
17 changes: 15 additions & 2 deletions internal/cache/in_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,20 @@ func (s *MemoryStore) Set(key string, value []byte, _ time.Duration) error {
}

// Del removes a key from the store.
func (s *MemoryStore) Del(key string) error {
s.data.Delete(key)
func (s *MemoryStore) Del(keys ...string) error {
for _, k := range keys {
s.data.Delete(k)
}
return nil
}
func (s *MemoryStore) GetAllWithPrefix(prefix string) (map[string][]byte, error) {
keys := make(map[string][]byte)
s.data.Range(func(k, v interface{}) bool {
key := k.(string)

Check failure on line 44 in internal/cache/in_memory.go

View workflow job for this annotation

GitHub Actions / Golangci-lint

Error return value is not checked (errcheck)
if key[:len(prefix)] == prefix {
keys[key] = v.([]byte)

Check failure on line 46 in internal/cache/in_memory.go

View workflow job for this annotation

GitHub Actions / Golangci-lint

Error return value is not checked (errcheck)
}
return true
})
return keys, nil
}
34 changes: 32 additions & 2 deletions internal/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func (r *RedisStore) Set(key string, value []byte, ttl time.Duration) error {
}

// Del removes a value by key.
func (r *RedisStore) Del(key string) error {
result, err := r.client.Del(r.ctx, key).Result()
func (r *RedisStore) Del(keys ...string) error {
result, err := r.client.Del(r.ctx, keys...).Result()
if err != nil {
return errors.Join(ErrCacheFailedToDel, err)
}
Expand All @@ -53,3 +53,33 @@ func (r *RedisStore) Del(key string) error {
}
return nil
}

// GetAllWithPrefix retrieves all key-value pairs that match a specific prefix.
func (r *RedisStore) GetAllWithPrefix(prefix string) (map[string][]byte, error) {
var cursor uint64
results := make(map[string][]byte)

for {
keys, newCursor, err := r.client.Scan(r.ctx, cursor, prefix+"*", 10).Result()
if err != nil {
return nil, errors.Join(ErrCacheFailedToScan, err)
}

for _, key := range keys {
value, err := r.client.Get(r.ctx, key).Result()
if errors.Is(err, redis.Nil) {
// Key has been removed between SCAN and GET, skip it
continue
} else if err != nil {
return nil, errors.Join(ErrCacheFailedToGet, err)
}
results[key] = []byte(value)
}

cursor = newCursor
if cursor == 0 {
break
}
}
return results, nil
}
19 changes: 8 additions & 11 deletions internal/metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,21 +414,18 @@ func (p *Processor) StartProcessStatusUpdatesInStorage() {
return
case statusUpdate := <-p.storageStatusUpdateCh:
// Ensure no duplicate statuses
actualUpdateStatusMap, err := p.updateStatusMap(statusUpdate)
err := p.updateStatusMap(statusUpdate)
if err != nil {
p.logger.Error("failed to update status", slog.String("err", err.Error()))
return
}

if len(actualUpdateStatusMap) >= p.processStatusUpdatesBatchSize {
p.checkAndUpdate(ctx, actualUpdateStatusMap)

// Reset ticker to delay the next tick, ensuring the interval starts after the batch is processed.
// This prevents unnecessary immediate updates and maintains the intended time interval between batches.
ticker.Reset(p.processStatusUpdatesInterval)
}
case <-ticker.C:
statusUpdatesMap := p.getStatusUpdateMap()
statusUpdatesMap, err := p.getAllTransactionStatuses()
if err != nil {
p.logger.Error("failed to get all transaction statuses", slog.String("err", err.Error()))
return
}

if len(statusUpdatesMap) > 0 {
p.checkAndUpdate(ctx, statusUpdatesMap)

Expand Down Expand Up @@ -465,7 +462,7 @@ func (p *Processor) checkAndUpdate(ctx context.Context, statusUpdatesMap StatusU
p.logger.Error("failed to bulk update statuses", slog.String("err", err.Error()))
}

err = p.cacheStore.Del(CacheStatusUpdateKey)
err = p.clearCache(statusUpdatesMap)
if err != nil {
p.logger.Error("failed to clear status update map", slog.String("err", err.Error()))
}
Expand Down
122 changes: 64 additions & 58 deletions internal/metamorph/processor_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@ import (

"github.com/libsv/go-p2p/chaincfg/chainhash"

"github.com/bitcoin-sv/arc/internal/cache"
"github.com/bitcoin-sv/arc/internal/metamorph/store"
)

type StatusUpdateMap map[chainhash.Hash]store.UpdateStatus

const CacheStatusUpdateKey = "status-updates"
const (
CacheStatusUpdateKey = "status-updates"
CacheTxPrefix = "tx-"
)

var (
ErrFailedToSerialize = errors.New("failed to serialize value")
Expand All @@ -22,51 +26,86 @@ func (p *Processor) GetProcessorMapSize() int {
return p.responseProcessor.getMapLen()
}

func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) (StatusUpdateMap, error) {
statusUpdatesMap := p.getStatusUpdateMap()
foundStatusUpdate, found := statusUpdatesMap[statusUpdate.Hash]
func (p *Processor) updateStatusMap(statusUpdate store.UpdateStatus) error {
currentStatusUpdate, err := p.getTransactionStatus(statusUpdate.Hash)
if err != nil {
if errors.Is(err, cache.ErrCacheNotFound) {
// if record doesn't exist, save new one
return p.setTransactionStatus(statusUpdate)
}
return err
}

if !found || shouldUpdateStatus(statusUpdate, foundStatusUpdate) {
if shouldUpdateStatus(statusUpdate, *currentStatusUpdate) {
if len(statusUpdate.CompetingTxs) > 0 {
statusUpdate.CompetingTxs = mergeUnique(statusUpdate.CompetingTxs, foundStatusUpdate.CompetingTxs)
statusUpdate.CompetingTxs = mergeUnique(statusUpdate.CompetingTxs, currentStatusUpdate.CompetingTxs)
}

statusUpdatesMap[statusUpdate.Hash] = statusUpdate
// TODO: combine status history
}

err := p.setStatusUpdateMap(statusUpdatesMap)
return p.setTransactionStatus(statusUpdate)
}

func (p *Processor) setTransactionStatus(status store.UpdateStatus) error {
bytes, err := json.Marshal(status)
if err != nil {
return statusUpdatesMap, err
return errors.Join(ErrFailedToSerialize, err)
}

return statusUpdatesMap, nil
err = p.cacheStore.Set(CacheTxPrefix+status.Hash.String(), bytes, processStatusUpdatesIntervalDefault)
if err != nil {
return err
}
return nil
}

func (p *Processor) setStatusUpdateMap(statusUpdatesMap StatusUpdateMap) error {
bytes, err := serializeStatusMap(statusUpdatesMap)
func (p *Processor) getTransactionStatus(hash chainhash.Hash) (*store.UpdateStatus, error) {
bytes, err := p.cacheStore.Get(CacheTxPrefix + hash.String())
if err != nil {
return err
return nil, err
}

err = p.cacheStore.Set(CacheStatusUpdateKey, bytes, processStatusUpdatesIntervalDefault)
var status store.UpdateStatus
err = json.Unmarshal(bytes, &status)
if err != nil {
return err
return nil, err
}
return nil

return &status, nil
}

func (p *Processor) getStatusUpdateMap() StatusUpdateMap {
existingMap, err := p.cacheStore.Get(CacheStatusUpdateKey)
func (p *Processor) getAllTransactionStatuses() (StatusUpdateMap, error) {
statuses := make(StatusUpdateMap)
keys, err := p.cacheStore.GetAllWithPrefix(CacheTxPrefix)
if err != nil {
return nil, err
}

if err == nil {
statusUpdatesMap, err := deserializeStatusMap(existingMap)
if err == nil {
return statusUpdatesMap
for key, value := range keys {
hash, err := chainhash.NewHashFromStr(key[len(CacheTxPrefix):])
if err != nil {
return nil, err
}

var status store.UpdateStatus
err = json.Unmarshal(value, &status)
if err != nil {
return nil, err
}

statuses[*hash] = status
}

return statuses, nil
}

func (p *Processor) clearCache(updateStatusMap StatusUpdateMap) error {
keys := make([]string, len(updateStatusMap))
for k, _ := range updateStatusMap {

Check failure on line 104 in internal/metamorph/processor_helpers.go

View workflow job for this annotation

GitHub Actions / Static check

unnecessary assignment to the blank identifier (S1005)

Check failure on line 104 in internal/metamorph/processor_helpers.go

View workflow job for this annotation

GitHub Actions / Golangci-lint

range: should omit 2nd value from range; this loop is equivalent to `for k := range ...` (revive)
keys = append(keys, CacheTxPrefix+k.String())
}

// If the key doesn't exist or there was an error unmarshalling the value return new map
return make(StatusUpdateMap)
return p.cacheStore.Del(keys...)
}

func shouldUpdateStatus(new, found store.UpdateStatus) bool {
Expand Down Expand Up @@ -122,36 +161,3 @@ func mergeUnique(arr1, arr2 []string) []string {

return uniqueSlice
}

func serializeStatusMap(updateStatusMap StatusUpdateMap) ([]byte, error) {
serializeMap := make(map[string]store.UpdateStatus)
for k, v := range updateStatusMap {
serializeMap[k.String()] = v
}

bytes, err := json.Marshal(serializeMap)
if err != nil {
return nil, errors.Join(ErrFailedToSerialize, err)
}
return bytes, nil
}

func deserializeStatusMap(data []byte) (StatusUpdateMap, error) {
serializeMap := make(map[string]store.UpdateStatus)
updateStatusMap := make(StatusUpdateMap)

err := json.Unmarshal(data, &serializeMap)
if err != nil {
return nil, errors.Join(ErrFailedToDeserialize, err)
}

for k, v := range serializeMap {
hash, err := chainhash.NewHashFromStr(k)
if err != nil {
return nil, errors.Join(ErrFailedToDeserialize, err)
}
updateStatusMap[*hash] = v
}

return updateStatusMap, nil
}

0 comments on commit b3cddd5

Please sign in to comment.