From 76151e277fd78d8963a04a3422d66e2291778ed8 Mon Sep 17 00:00:00 2001 From: Rueian Date: Sun, 5 Jan 2025 10:55:18 -0800 Subject: [PATCH] feat: new experimental gc friendly flatten cache Signed-off-by: Rueian --- cache.go | 63 ++++++++++++++++++++++++++++----------------------- cache_test.go | 5 ++++ 2 files changed, 40 insertions(+), 28 deletions(-) diff --git a/cache.go b/cache.go index 975d7ae0..2ae7d52c 100644 --- a/cache.go +++ b/cache.go @@ -221,25 +221,26 @@ func (f *flatentry) find(cmd string, ts int64) (ret RedisMessage, expired bool) } const lrBatchSize = 64 +const flattEntrySize = unsafe.Sizeof(flatentry{}) type lrBatch struct { m map[*flatentry]struct{} } -func NewFlattenCache(limit int64) CacheStore { +func NewFlattenCache(limit int) CacheStore { f := &flatten{ flights: make(map[string]*adapterEntry), cache: make(map[string]*flatentry), head: &flatentry{}, tail: &flatentry{}, size: 0, - limit: limit, + limit: int64(limit), } f.head.next = unsafe.Pointer(f.tail) f.tail.prev = unsafe.Pointer(f.head) f.lrup = sync.Pool{New: func() any { b := &lrBatch{m: make(map[*flatentry]struct{}, lrBatchSize)} - runtime.SetFinalizer(b, func() { + runtime.SetFinalizer(b, func(b *lrBatch) { f.llTailBatch(b) }) return b @@ -330,40 +331,46 @@ func (f *flatten) Flight(key, cmd string, ttl time.Duration, now time.Time) (Red if af = f.flights[fk]; af != nil { return RedisMessage{}, af } - f.flights[fk] = &adapterEntry{ch: make(chan struct{}), xat: ts + ttl.Milliseconds()} + if f.flights != nil { + f.flights[fk] = &adapterEntry{ch: make(chan struct{}), xat: ts + ttl.Milliseconds()} + } return RedisMessage{}, nil } -func (f *flatten) Update(key, cmd string, val RedisMessage) int64 { +func (f *flatten) Update(key, cmd string, val RedisMessage) (sxat int64) { fk := key + cmd - bs := val.CacheMarshal(nil) - fe := &flatentry{cmd: cmd, val: bs, ttl: val.CachePXAT(), size: int64(len(bs)+len(key)+len(cmd)) + int64(unsafe.Sizeof(flatentry{}))} - f.mu.Lock() + f.mu.RLock() af := f.flights[fk] + f.mu.RUnlock() if af != nil { - delete(f.flights, fk) - if af.xat < fe.ttl { - fe.ttl = af.xat + sxat = val.getExpireAt() + if af.xat < sxat || sxat == 0 { + sxat = af.xat + val.setExpireAt(sxat) } - } - f.size += fe.size - for ep := f.head.next; f.size > f.limit && ep != unsafe.Pointer(f.tail); { - e := (*flatentry)(ep) - f.remove(e) - ep = e.next - } - if e := f.cache[key]; e == nil { - fe.key = key - f.cache[key] = fe - f.llAdd(fe) - } else { - e.insert(fe) - } - f.mu.Unlock() - if af != nil { + bs := val.CacheMarshal(nil) + fe := &flatentry{cmd: cmd, val: bs, ttl: sxat, size: int64(len(bs)+len(key)+len(cmd)) + int64(flattEntrySize)} + f.mu.Lock() + if f.flights != nil { + delete(f.flights, fk) + f.size += fe.size + for ep := f.head.next; f.size > f.limit && ep != unsafe.Pointer(f.tail); { + e := (*flatentry)(ep) + f.remove(e) + ep = e.next + } + if e := f.cache[key]; e == nil { + fe.key = key + f.cache[key] = fe + f.llAdd(fe) + } else { + e.insert(fe) + } + } + f.mu.Unlock() af.set(val, nil) } - return fe.ttl + return sxat } func (f *flatten) Cancel(key, cmd string, err error) { diff --git a/cache_test.go b/cache_test.go index 3e6d12f6..9010b8aa 100644 --- a/cache_test.go +++ b/cache_test.go @@ -183,6 +183,11 @@ func TestCacheStore(t *testing.T) { return NewSimpleCacheAdapter(&simple{store: map[string]RedisMessage{}}) }) }) + t.Run("FlattenCache", func(t *testing.T) { + test(t, func() CacheStore { + return NewFlattenCache(DefaultCacheBytes) + }) + }) } type simple struct {