-
Notifications
You must be signed in to change notification settings - Fork 0
/
retry.go
189 lines (152 loc) · 4.65 KB
/
retry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package curator
import (
"math"
"math/rand"
"net"
"time"
"github.com/go-zookeeper/zk"
)
// Abstraction for retry policies to sleep
type RetrySleeper interface {
// Sleep for the given time
SleepFor(time time.Duration) error
}
// Abstracts the policy to use when retrying connections
type RetryPolicy interface {
// Called when an operation has failed for some reason.
// This method should return true to make another attempt.
AllowRetry(retryCount int, elapsedTime time.Duration, sleeper RetrySleeper) bool
}
type defaultRetrySleeper struct {
}
var DefaultRetrySleeper RetrySleeper = &defaultRetrySleeper{}
func (s *defaultRetrySleeper) SleepFor(d time.Duration) error {
time.Sleep(d)
return nil
}
// Mechanism to perform an operation on Zookeeper that is safe against disconnections and "recoverable" errors.
type RetryLoop interface {
// creates a retry loop calling the given proc and retrying if needed
CallWithRetry(proc func() (interface{}, error)) (interface{}, error)
}
type retryLoop struct {
done bool
retryCount int
startTime time.Time
retryPolicy RetryPolicy
retrySleeper RetrySleeper
tracer TracerDriver
}
func newRetryLoop(retryPolicy RetryPolicy, tracer TracerDriver) *retryLoop {
return &retryLoop{
startTime: time.Now(),
retryPolicy: retryPolicy,
tracer: tracer,
}
}
// return true if the given Zookeeper result code is retry-able
func (l *retryLoop) ShouldRetry(err error) bool {
if err == zk.ErrSessionExpired || err == zk.ErrSessionMoved {
return true
}
if netErr, ok := err.(net.Error); ok {
return netErr.Timeout() || netErr.Temporary()
}
return false
}
func (l *retryLoop) CallWithRetry(proc func() (interface{}, error)) (interface{}, error) {
for {
if ret, err := proc(); err == nil || !l.ShouldRetry(err) {
return ret, err
} else {
l.retryCount++
if sleeper := l.retrySleeper; sleeper == nil {
sleeper = DefaultRetrySleeper
} else {
if !l.retryPolicy.AllowRetry(l.retryCount, time.Now().Sub(l.startTime), sleeper) {
l.tracer.AddCount("retries-disallowed", 1)
return ret, err
} else {
l.tracer.AddCount("retries-allowed", 1)
}
}
}
}
return nil, nil
}
type SleepingRetry struct {
RetryPolicy
N int
getSleepTime func(retryCount int, elapsedTime time.Duration) time.Duration
}
func (r *SleepingRetry) AllowRetry(retryCount int, elapsedTime time.Duration, sleeper RetrySleeper) bool {
if retryCount < r.N {
if err := sleeper.SleepFor(r.getSleepTime(retryCount, elapsedTime)); err != nil {
return false
}
return true
}
return false
}
// Retry policy that retries a max number of times
type RetryNTimes struct {
SleepingRetry
}
func NewRetryNTimes(n int, sleepBetweenRetries time.Duration) *RetryNTimes {
return &RetryNTimes{
SleepingRetry: SleepingRetry{
N: n,
getSleepTime: func(retryCount int, elapsedTime time.Duration) time.Duration { return sleepBetweenRetries },
},
}
}
// A retry policy that retries only once
type RetryOneTime struct {
RetryNTimes
}
func NewRetryOneTime(sleepBetweenRetry time.Duration) *RetryOneTime {
return &RetryOneTime{
*NewRetryNTimes(1, sleepBetweenRetry),
}
}
const (
MAX_RETRIES_LIMIT = 29
DEFAULT_MAX_SLEEP time.Duration = time.Duration(math.MaxInt32 * int64(time.Second))
)
// Retry policy that retries a set number of times with increasing sleep time between retries
type ExponentialBackoffRetry struct {
SleepingRetry
}
func NewExponentialBackoffRetry(baseSleepTime time.Duration, maxRetries int, maxSleep time.Duration) *ExponentialBackoffRetry {
if maxRetries > MAX_RETRIES_LIMIT {
maxRetries = MAX_RETRIES_LIMIT
}
return &ExponentialBackoffRetry{
SleepingRetry: SleepingRetry{
N: maxRetries,
getSleepTime: func(retryCount int, elapsedTime time.Duration) time.Duration {
sleepTime := time.Duration(int64(baseSleepTime) * rand.Int63n(1<<uint(retryCount)))
if sleepTime > maxSleep {
sleepTime = maxSleep
}
return sleepTime
},
}}
}
// A retry policy that retries until a given amount of time elapses
type RetryUntilElapsed struct {
SleepingRetry
maxElapsedTime time.Duration
}
func NewRetryUntilElapsed(maxElapsedTime, sleepBetweenRetries time.Duration) *RetryUntilElapsed {
return &RetryUntilElapsed{
SleepingRetry: SleepingRetry{
N: math.MaxInt64,
getSleepTime: func(retryCount int, elapsedTime time.Duration) time.Duration { return sleepBetweenRetries },
},
maxElapsedTime: maxElapsedTime,
}
}
func (r *RetryUntilElapsed) AllowRetry(retryCount int, elapsedTime time.Duration, sleeper RetrySleeper) bool {
return elapsedTime < r.maxElapsedTime && r.SleepingRetry.AllowRetry(retryCount, elapsedTime, sleeper)
}