-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathmicrocache.go
462 lines (415 loc) · 11.7 KB
/
microcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
// microcache is a non-standard HTTP microcache implemented as Go middleware.
package microcache
import (
"fmt"
"net/http"
"strings"
"sync"
"time"
)
type Microcache interface {
Middleware(http.Handler) http.Handler
Start()
Stop()
offsetIncr(time.Duration)
}
type microcache struct {
Nocache bool
Timeout time.Duration
TTL time.Duration
StaleIfError time.Duration
StaleRecache bool
StaleWhileRevalidate time.Duration
HashQuery bool
QueryIgnore map[string]bool
CollapsedForwarding bool
Vary []string
Driver Driver
Compressor Compressor
Monitor Monitor
Exposed bool
SuppressAgeHeader bool
stopMonitor chan bool
revalidating map[string]bool
revalidateMutex *sync.Mutex
collapse map[string]*sync.Mutex
collapseMutex *sync.Mutex
// Used to advance time for testing
offset time.Duration
offsetMutex *sync.RWMutex
}
type Config struct {
// Nocache prevents responses from being cached by default
// Can be overridden by the microcache-cache and microcache-nocache response headers
Nocache bool
// Timeout specifies the maximum execution time for backend responses
// Example: If the underlying handler takes more than 10s to respond,
// the request is cancelled and the response is treated as 503
// Recommended: 10s
// Default: 0
Timeout time.Duration
// TTL specifies a default ttl for cached responses
// Can be overridden by the microcache-ttl response header
// Recommended: 10s
// Default: 0
TTL time.Duration
// StaleWhileRevalidate specifies a period during which a stale response may be
// served immediately while the resource is fetched in the background. This can be
// useful for ensuring consistent response times at the cost of content freshness.
// More Info: https://tools.ietf.org/html/rfc5861
// Recommended: 20s
// Default: 0
StaleWhileRevalidate time.Duration
// StaleIfError specifies a default stale grace period
// If a request fails and StaleIfError is set, the object will be served as stale
// and the response will be re-cached for the duration of this grace period
// Can be overridden by the microcache-ttl-stale response header
// More Info: https://tools.ietf.org/html/rfc5861
// Recommended: 20s
// Default: 0
StaleIfError time.Duration
// StaleRecache specifies whether to re-cache the response object for ttl while serving
// stale response on backend error
// Recommended: true
// Default: false
StaleRecache bool
// CollapsedForwarding specifies whether to collapse duplicate requests
// This helps prevent servers with a cold cache from hammering the backend
// Default: false
CollapsedForwarding bool
// HashQuery determines whether all query parameters in the request URI
// should be hashed to differentiate requests
// Default: false
HashQuery bool
// QueryIgnore is a list of query parameters to ignore when hashing
// Default: nil
QueryIgnore []string
// Vary specifies a list of http request headers by which all requests
// should be differentiated. When making use of this option, it may be a good idea
// to normalize these headers first using a separate piece of middleware.
//
// []string{"accept-language", "accept-encoding", "xml-http-request"}
//
// Default: []string{}
Vary []string
// Driver specifies a cache storage driver
// Default: lru with 10,000 item capacity
Driver Driver
// Compressor specifies a compressor to use for reducing the memory required to cache
// response bodies
// Default: nil
Compressor Compressor
// Monitor is an optional parameter which will periodically report statistics about
// the cache to enable monitoring of cache size, cache efficiency and error rate
// Default: nil
Monitor Monitor
// Exposed determines whether to add a header to the response indicating the response state
// Microcache: ( HIT | MISS | STALE )
// Default: false
Exposed bool
// SuppressAgeHeader determines whether to suppress the age header in responses
// The age header is added by default to all HIT and STALE responses
// Age: ( seconds )
// Default: false
SuppressAgeHeader bool
}
// New creates and returns a configured microcache instance
func New(o Config) *microcache {
// Defaults
m := microcache{
Nocache: o.Nocache,
TTL: o.TTL,
StaleIfError: o.StaleIfError,
StaleRecache: o.StaleRecache,
StaleWhileRevalidate: o.StaleWhileRevalidate,
Timeout: o.Timeout,
HashQuery: o.HashQuery,
CollapsedForwarding: o.CollapsedForwarding,
Vary: o.Vary,
Driver: o.Driver,
Compressor: o.Compressor,
Monitor: o.Monitor,
Exposed: o.Exposed,
SuppressAgeHeader: o.SuppressAgeHeader,
revalidating: map[string]bool{},
revalidateMutex: &sync.Mutex{},
collapse: map[string]*sync.Mutex{},
collapseMutex: &sync.Mutex{},
offsetMutex: &sync.RWMutex{},
}
if o.Driver == nil {
m.Driver = NewDriverLRU(1e4) // default 10k cache items
}
if o.QueryIgnore != nil {
m.QueryIgnore = make(map[string]bool)
for _, key := range o.QueryIgnore {
m.QueryIgnore[key] = true
}
}
m.Start()
return &m
}
// Middleware can be used to wrap an HTTP handler with microcache functionality.
// It can also be passed to http middleware providers like alice as a constructor.
//
// mx := microcache.New(microcache.Config{TTL: 10 * time.Second})
// newHandler := mx.Middleware(yourHandler)
//
// Or with alice
//
// chain.Append(mx.Middleware)
//
func (m *microcache) Middleware(h http.Handler) http.Handler {
if m.Timeout > 0 {
h = http.TimeoutHandler(h, m.Timeout, "Timed out")
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Websocket passthrough
upgrade := strings.ToLower(r.Header.Get("connection")) == "upgrade"
if upgrade || m.Driver == nil {
if m.Monitor != nil {
m.Monitor.Miss()
}
h.ServeHTTP(w, r)
return
}
// Fetch request options
reqHash := getRequestHash(m, r)
req := m.Driver.GetRequestOpts(reqHash)
// Hard passthrough on non cacheable requests
if req.nocache {
if m.Monitor != nil {
m.Monitor.Miss()
}
h.ServeHTTP(w, r)
return
}
// CollapsedForwarding
// This implementation may collapse too many uncacheable requests.
// Refactor may be complicated.
if m.CollapsedForwarding {
m.collapseMutex.Lock()
mutex, ok := m.collapse[reqHash]
if !ok {
mutex = &sync.Mutex{}
m.collapse[reqHash] = mutex
}
m.collapseMutex.Unlock()
// Mutex serializes collapsible requests
mutex.Lock()
defer func() {
mutex.Unlock()
m.collapseMutex.Lock()
delete(m.collapse, reqHash)
m.collapseMutex.Unlock()
}()
if !req.found {
req = m.Driver.GetRequestOpts(reqHash)
}
}
// Fetch cached response object
var objHash string
var obj Response
if req.found {
objHash = req.getObjectHash(reqHash, r)
obj = m.Driver.Get(objHash)
if m.Compressor != nil {
obj = m.Compressor.Expand(obj)
}
}
// Non-cacheable request method passthrough and purge
if r.Method != "GET" && r.Method != "HEAD" && r.Method != "OPTIONS" {
if m.Monitor != nil {
m.Monitor.Miss()
}
if obj.found {
// HTTP spec requires caches to purge cached responses following
// successful unsafe request
ptw := passthroughWriter{w, 0}
h.ServeHTTP(&ptw, r)
if ptw.status >= 200 && ptw.status < 400 {
m.Driver.Remove(objHash)
}
} else {
h.ServeHTTP(w, r)
}
return
}
// Fresh response object found
if obj.found && obj.expires.After(m.now()) {
if m.Monitor != nil {
m.Monitor.Hit()
}
if m.Exposed {
w.Header().Set("microcache", "HIT")
}
m.setAgeHeader(w, obj)
obj.sendResponse(w)
return
}
// Stale While Revalidate
if obj.found && req.staleWhileRevalidate > 0 &&
obj.expires.Add(req.staleWhileRevalidate).After(m.now()) {
if m.Monitor != nil {
m.Monitor.Stale()
}
if m.Exposed {
w.Header().Set("microcache", "STALE")
}
m.setAgeHeader(w, obj)
obj.sendResponse(w)
// Dedupe revalidation
m.revalidateMutex.Lock()
_, revalidating := m.revalidating[objHash]
if !revalidating {
m.revalidating[objHash] = true
}
m.revalidateMutex.Unlock()
if !revalidating {
br := newBackgroundRequest(r)
go func() {
defer func() {
// Clear revalidation lock
m.revalidateMutex.Lock()
delete(m.revalidating, objHash)
m.revalidateMutex.Unlock()
}()
m.handleBackendResponse(h, w, br, reqHash, req, objHash, obj, true)
}()
}
return
} else {
m.handleBackendResponse(h, w, r, reqHash, req, objHash, obj, false)
return
}
})
}
func (m *microcache) handleBackendResponse(
h http.Handler,
w http.ResponseWriter,
r *http.Request,
reqHash string,
req RequestOpts,
objHash string,
obj Response,
background bool,
) {
if m.Monitor != nil {
m.Monitor.Backend()
}
// Backend Response
beres := Response{header: http.Header{}}
// Execute request
h.ServeHTTP(&beres, r)
if !beres.headerWritten {
beres.status = http.StatusOK
}
// Log Error
if beres.status >= 500 && m.Monitor != nil {
m.Monitor.Error()
}
// Serve Stale
if beres.status >= 500 && obj.found {
serveStale := obj.expires.Add(req.staleIfError).After(m.now())
// Extend stale response expiration by staleIfError grace period
if req.found && serveStale && req.staleRecache {
obj.expires = obj.date.Add(m.getOffset()).Add(req.ttl)
m.store(objHash, obj)
}
if !background && serveStale {
if m.Monitor != nil {
m.Monitor.Stale()
}
if m.Exposed {
w.Header().Set("microcache", "STALE")
}
m.setAgeHeader(w, obj)
obj.sendResponse(w)
return
}
}
// Backend Request succeeded
if beres.status >= 200 && beres.status < 400 {
if !req.found {
// Store request options
req = buildRequestOpts(m, beres, r)
m.Driver.SetRequestOpts(reqHash, req)
objHash = req.getObjectHash(reqHash, r)
}
// Cache response
if !req.nocache {
beres.expires = m.now().Add(req.ttl)
m.store(objHash, beres)
}
}
// Don't render response during background revalidate
if background {
return
}
if m.Monitor != nil {
m.Monitor.Miss()
}
if m.Exposed {
w.Header().Set("microcache", "MISS")
}
beres.sendResponse(w)
}
// Start starts the monitor and any other required background processes
func (m *microcache) Start() {
if m.stopMonitor != nil || m.Monitor == nil {
return
}
m.stopMonitor = make(chan bool)
go func() {
for {
select {
case <-time.After(m.Monitor.GetInterval()):
m.Monitor.Log(Stats{
Size: m.Driver.GetSize(),
})
case <-m.stopMonitor:
return
}
}
}()
}
// setAgeHeader sets the age header if not suppressed
func (m *microcache) setAgeHeader(w http.ResponseWriter, obj Response) {
if !m.SuppressAgeHeader {
age := (m.now().Unix() - obj.date.Unix())
w.Header().Set("age", fmt.Sprintf("%d", age))
}
}
// store sets the age header if not suppressed
func (m *microcache) store(objHash string, obj Response) {
obj.found = true
obj.date = time.Now()
if m.Compressor != nil {
m.Driver.Set(objHash, m.Compressor.Compress(obj))
} else {
m.Driver.Set(objHash, obj)
}
}
// Stop stops the monitor and any other required background processes
func (m *microcache) Stop() {
if m.stopMonitor == nil {
return
}
m.stopMonitor <- true
}
// Increments the offset for testing purposes
func (m *microcache) offsetIncr(o time.Duration) {
m.offsetMutex.Lock()
defer m.offsetMutex.Unlock()
m.offset += o
}
// Get offset
func (m *microcache) getOffset() time.Duration {
m.offsetMutex.RLock()
defer m.offsetMutex.RUnlock()
return m.offset
}
// Get current time with offset
func (m *microcache) now() time.Time {
return time.Now().Add(m.getOffset())
}