forked from TRON-US/go-mfs
-
Notifications
You must be signed in to change notification settings - Fork 3
/
repub.go
197 lines (174 loc) · 4.79 KB
/
repub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
package mfs
import (
"context"
"time"
cid "github.com/ipfs/go-cid"
)
// PubFunc is the user-defined function that determines exactly what
// logic entails "publishing" a `Cid` value.
type PubFunc func(context.Context, cid.Cid) error
// Republisher manages when to publish a given entry.
type Republisher struct {
TimeoutLong time.Duration
TimeoutShort time.Duration
RetryTimeout time.Duration
pubfunc PubFunc
update chan cid.Cid
immediatePublish chan chan struct{}
ctx context.Context
cancel func()
}
// NewRepublisher creates a new Republisher object to republish the given root
// using the given short and long time intervals.
func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration) *Republisher {
ctx, cancel := context.WithCancel(ctx)
return &Republisher{
TimeoutShort: tshort,
TimeoutLong: tlong,
RetryTimeout: tlong,
update: make(chan cid.Cid, 1),
pubfunc: pf,
immediatePublish: make(chan chan struct{}),
ctx: ctx,
cancel: cancel,
}
}
// WaitPub waits for the current value to be published (or returns early
// if it already has).
func (rp *Republisher) WaitPub(ctx context.Context) error {
wait := make(chan struct{})
select {
case rp.immediatePublish <- wait:
case <-ctx.Done():
return ctx.Err()
}
select {
case <-wait:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (rp *Republisher) Close() error {
// TODO(steb): Wait for `Run` to stop
err := rp.WaitPub(rp.ctx)
rp.cancel()
return err
}
// Update the current value. The value will be published after a delay but each
// consecutive call to Update may extend this delay up to TimeoutLong.
func (rp *Republisher) Update(c cid.Cid) {
select {
case <-rp.update:
select {
case rp.update <- c:
default:
// Don't try again. If we hit this case, there's a
// concurrent publish and we can safely let that
// concurrent publish win.
}
case rp.update <- c:
}
}
// Run contains the core logic of the `Republisher`. It calls the user-defined
// `pubfunc` function whenever the `Cid` value is updated to a *new* value. The
// complexity comes from the fact that `pubfunc` may be slow so we need to batch
// updates.
//
// Algorithm:
// 1. When we receive the first update after publishing, we set a `longer` timer.
// 2. When we receive any update, we reset the `quick` timer.
// 3. If either the `quick` timeout or the `longer` timeout elapses,
// we call `publish` with the latest updated value.
//
// The `longer` timer ensures that we delay publishing by at most
// `TimeoutLong`. The `quick` timer allows us to publish sooner if
// it looks like there are no more updates coming down the pipe.
//
// Note: If a publish fails, we retry repeatedly every TimeoutRetry.
func (rp *Republisher) Run(lastPublished cid.Cid) {
quick := time.NewTimer(0)
if !quick.Stop() {
<-quick.C
}
longer := time.NewTimer(0)
if !longer.Stop() {
<-longer.C
}
var toPublish cid.Cid
for rp.ctx.Err() == nil {
var waiter chan struct{}
select {
case <-rp.ctx.Done():
return
case newValue := <-rp.update:
// Skip already published values.
if lastPublished.Equals(newValue) {
// Break to the end of the switch to cleanup any
// timers.
toPublish = cid.Undef
break
}
// If we aren't already waiting to publish something,
// reset the long timeout.
if !toPublish.Defined() {
longer.Reset(rp.TimeoutLong)
}
// Always reset the short timeout.
quick.Reset(rp.TimeoutShort)
// Finally, set the new value to publish.
toPublish = newValue
continue
case waiter = <-rp.immediatePublish:
// Make sure to grab the *latest* value to publish.
select {
case toPublish = <-rp.update:
default:
}
// Avoid publishing duplicate values
if lastPublished.Equals(toPublish) {
toPublish = cid.Undef
}
case <-quick.C:
case <-longer.C:
}
// Cleanup, publish, and close waiters.
// 1. Stop any timers. Don't use the `if !t.Stop() { ... }`
// idiom as these timers may not be running.
quick.Stop()
select {
case <-quick.C:
default:
}
longer.Stop()
select {
case <-longer.C:
default:
}
// 2. If we have a value to publish, publish it now.
if toPublish.Defined() {
for {
err := rp.pubfunc(rp.ctx, toPublish)
if err == nil {
break
}
// Keep retrying until we succeed or we abort.
// TODO(steb): We could try pulling new values
// off `update` but that's not critical (and
// complicates this code a bit). We'll pull off
// a new value on the next loop through.
select {
case <-time.After(rp.RetryTimeout):
case <-rp.ctx.Done():
return
}
}
lastPublished = toPublish
toPublish = cid.Undef
}
// 3. Trigger anything waiting in `WaitPub`.
if waiter != nil {
close(waiter)
}
}
}