-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtempo.go
104 lines (94 loc) · 2.05 KB
/
tempo.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package tempo
import (
"time"
)
type item interface{}
// Config configure time interval and set a batch limit.
type Config struct {
Interval time.Duration
MaxBatchItems int
}
// NewDispatcher returns an initialized instance of Dispatcher.
func NewDispatcher(c *Config) *Dispatcher {
return &Dispatcher{
doWork: make(chan bool),
stop: make(chan bool),
Q: make(chan item),
Batch: make(chan []item),
Interval: c.Interval,
MaxBatchItems: c.MaxBatchItems,
DispatchedCount: 0,
}
}
// Dispatcher coordinates dispatching of queue items by time intervals
// or immediately after the batching limit is met.
type Dispatcher struct {
doWork chan bool
stop chan bool
timer *time.Timer
Q chan item
Batch chan []item
Interval time.Duration
MaxBatchItems int
DispatchedCount int
}
func (d *Dispatcher) tick() {
if d.timer != nil {
d.timer.Reset(d.Interval)
return
}
d.timer = time.AfterFunc(d.Interval, func() {
d.doWork <- true
})
}
func (d *Dispatcher) dispatch(batch chan item) {
var items []item
for b := range batch {
items = append(items, b)
}
d.DispatchedCount += len(items)
d.Batch <- items
}
// Start begins item dispatching.
func (d *Dispatcher) Start() {
d.tick()
batch := make(chan item, d.MaxBatchItems)
for {
select {
case m := <-d.Q:
if len(batch) < cap(batch) {
batch <- m
} else {
// NOTE at this point, there's no space in the
// batch and we have an item pending enqueue.
d.timer.Stop()
go func() {
d.doWork <- true
// enqueue into the next batch.
d.Q <- m
}()
}
case doWork := <-d.doWork:
if !doWork {
continue
}
if len(batch) <= 0 {
d.tick()
continue
}
close(batch)
d.dispatch(batch)
batch = make(chan item, d.MaxBatchItems)
d.tick()
case <-d.stop:
d.timer.Stop()
d.DispatchedCount = 0
return
}
}
}
// Stop stops the internal dispatch scheduler.
func (d *Dispatcher) Stop() {
d.timer.Stop()
d.stop <- true
}