-
Notifications
You must be signed in to change notification settings - Fork 2
/
updater.go
134 lines (114 loc) · 3.45 KB
/
updater.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package geche
import (
"sync"
)
// UpdateFn is a type for a function to be called to get updated value
// when Updater has a cache miss.
type UpdateFn[K comparable, V any] func(key K) (V, error)
// Updater is a wrapper on any Geche interface implementation
// That calls cache update function if key does not exist in the cache.
// It only allows one Update function per key to be running at a single point of time,
// reducing odds to get a "cache centipede" situation.
type Updater[K comparable, V any] struct {
cache Geche[K, V]
updateFn UpdateFn[K, V]
pool chan struct{}
inFlight map[K]chan struct{}
mux sync.RWMutex
}
// NewCacheUpdater returns cache wrapped with Updater. It calls updateFn
// whenever Get function returns ErrNotFound to update cache key.
// Only one updateFn for a given key can run at the same time, and only
// poolSize updateFn with different keys san run simultaneously.
func NewCacheUpdater[K comparable, V any](
cache Geche[K, V],
updateFn UpdateFn[K, V],
poolSize int,
) *Updater[K, V] {
u := Updater[K, V]{
cache: cache,
updateFn: updateFn,
pool: make(chan struct{}, poolSize),
inFlight: make(map[K]chan struct{}, poolSize),
}
return &u
}
// checkAndWaitInFlight waits for other cache key update
// operation to finish. Returns true if had to wait (update operation
// for key was running).
func (u *Updater[K, V]) waitInFlight(key K) bool {
u.mux.RLock()
ch, ok := u.inFlight[key]
u.mux.RUnlock()
if !ok {
return false
}
<-ch // Wait for channel to be closed.
return true
}
func (u *Updater[K, V]) Set(key K, value V) {
u.cache.Set(key, value)
}
func (u *Updater[K, V]) SetIfPresent(key K, value V) (V, bool) {
return u.cache.SetIfPresent(key, value)
}
// Get returns value from the cache. If the value is not in the cache,
// it calls updateFn to get the value and update the cache first.
// Since updateFn can return error, Get is not guaranteed to always return the value.
// When cache update fails, Get will return the error that updateFn returned,
// and not ErrNotFound.
func (u *Updater[K, V]) Get(key K) (V, error) {
v, err := u.cache.Get(key)
// Cache miss - update the cache!
if err == ErrNotFound {
if u.waitInFlight(key) {
// If we had to wait, then other goroutine has already updated
// the cache. Returning it.
return u.cache.Get(key)
}
// Put token in the pool. Will wait if pool is full.
u.pool <- struct{}{}
u.mux.Lock()
u.inFlight[key] = make(chan struct{})
u.mux.Unlock()
defer func() {
// When finished cache update, releasing all locks.
u.mux.Lock()
ch, ok := u.inFlight[key]
if ok {
close(ch)
delete(u.inFlight, key)
}
u.mux.Unlock()
<-u.pool
}()
v, err = u.updateFn(key)
if err != nil {
return v, err
}
u.cache.Set(key, v)
}
return v, err
}
// Del deletes key from the cache.
func (u *Updater[K, V]) Del(key K) error {
return u.cache.Del(key)
}
// Snapshot returns a shallow copy of the cache.
// It locks the cache from modification for the duration of the copy.
func (u *Updater[K, V]) Snapshot() map[K]V {
return u.cache.Snapshot()
}
// Len returns the number of items in the cache.
func (u *Updater[K, V]) Len() int {
return u.cache.Len()
}
// ListByPrefix should only be called if underlying cache is KV.
// Otherwise it will panic.
func (u *Updater[K, V]) ListByPrefix(prefix string) ([]V, error) {
kv, ok := any(u.cache).(*KV[V])
if !ok {
panic("cache does not support ListByPrefix")
}
return kv.ListByPrefix(prefix)
}