forked from stoewer/go-nakadi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstreams.go
231 lines (199 loc) · 6.99 KB
/
streams.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
package nakadi
import (
"context"
"time"
"github.com/cenkalti/backoff"
)
// A Cursor marks the current read position in a stream. It returned along with each received batch of
// events and is furthermore used to commit a batch of events (as well as all previous events).
type Cursor struct {
Partition string `json:"partition"`
Offset string `json:"offset"`
EventType string `json:"event_type"`
CursorToken string `json:"cursor_token"`
NakadiStreamID string `json:"-"`
}
// StreamOptions contains optional parameters that are used to create a StreamAPI.
type StreamOptions struct {
// The maximum number of Events in each chunk (and therefore per partition) of the stream (default: 1)
BatchLimit int
// The initial (minimal) retry interval used for the exponential backoff. This value is applied for
// stream initialization as well as for cursor commits.
InitialRetryInterval time.Duration
// MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches this value
// the retry intervals remain constant. This value is applied for stream initialization as well as
// for cursor commits.
MaxRetryInterval time.Duration
// MaxElapsedTime is the maximum time spent on retries when committing a cursor. Once this value
// was reached the exponential backoff is halted and the cursor will not be committed.
CommitMaxElapsedTime time.Duration
// NotifyErr is called when an error occurs that leads to a retry. This notify function can be used to
// detect unhealthy streams.
NotifyErr func(error, time.Duration)
// NotifyOK is called whenever a successful operation was completed. This notify function can be used
// to detect that a stream is healthy again.
NotifyOK func()
}
func (o *StreamOptions) withDefaults() *StreamOptions {
var copyOptions StreamOptions
if o != nil {
copyOptions = *o
}
if copyOptions.InitialRetryInterval == 0 {
copyOptions.InitialRetryInterval = defaultInitialRetryInterval
}
if copyOptions.MaxRetryInterval == 0 {
copyOptions.MaxRetryInterval = defaultMaxRetryInterval
}
if copyOptions.CommitMaxElapsedTime == 0 {
copyOptions.CommitMaxElapsedTime = defaultMaxElapsedTime
}
if copyOptions.NotifyErr == nil {
copyOptions.NotifyErr = func(_ error, _ time.Duration) {}
}
if copyOptions.NotifyOK == nil {
copyOptions.NotifyOK = func() {}
}
return ©Options
}
// defaultStreamOptions provides some default values
//var defaultStreamOptions = StreamOptions{
// InitialRetryInterval: time.Millisecond * 50,
// MaxRetryInterval: time.Minute,
// CommitMaxElapsedTime: time.Minute * 2,
//}
// NewStream is used to instantiate a new steam processing sub API. As for all sub APIs of the `go-nakadi`
// package NewStream receives a configured Nakadi client. Furthermore a valid subscription ID must be
// provided. Use the SubscriptionAPI in order to obtain subscriptions. The options parameter can be used
// to configure the behavior of the stream. The options may be nil.
func NewStream(client *Client, subscriptionID string, options *StreamOptions) *StreamAPI {
options = options.withDefaults()
ctx, cancel := context.WithCancel(context.Background())
streamBackOff := backoff.NewExponentialBackOff()
streamBackOff.InitialInterval = options.InitialRetryInterval
streamBackOff.MaxInterval = options.MaxRetryInterval
commitBackOff := backoff.NewExponentialBackOff()
commitBackOff.InitialInterval = options.InitialRetryInterval
commitBackOff.MaxInterval = options.MaxRetryInterval
streamAPI := &StreamAPI{
opener: &simpleStreamOpener{
client: client,
subscriptionID: subscriptionID,
batchLimit: options.BatchLimit},
committer: &simpleCommitter{
client: client,
subscriptionID: subscriptionID},
eventCh: make(chan eventsOrError, 10),
ctx: ctx,
cancel: cancel,
streamBackOff: backoff.WithContext(streamBackOff, ctx),
commitBackOff: backoff.WithContext(commitBackOff, ctx),
notifyErr: options.NotifyErr,
notifyOK: options.NotifyOK}
go streamAPI.startStream()
return streamAPI
}
// A StreamAPI is a sub API which is used to consume events from a specific subscription using Nakadi's
// high level stream API. In order to ensure that only successfully processed events are committed, it is
// crucial to commit cursors of respective event batches in the same order they were received.
type StreamAPI struct {
opener streamOpener
committer committer
eventCh chan eventsOrError
ctx context.Context
cancel context.CancelFunc
commitBackOff backoff.BackOff
streamBackOff backoff.BackOff
notifyErr func(error, time.Duration)
notifyOK func()
}
// NextEvents reads the next batch of events from the stream and returns the encoded events along with the
// respective cursor. It blocks until the batch of events can be read from the stream, or the stream is closed.
func (s *StreamAPI) NextEvents() (Cursor, []byte, error) {
select {
case <-s.ctx.Done():
return Cursor{}, nil, context.Canceled
case next := <-s.eventCh:
return next.cursor, next.events, next.err
}
}
// CommitCursor commits a cursor to Nakadi.
func (s *StreamAPI) CommitCursor(cursor Cursor) error {
var err error
backoff.RetryNotify(func() error {
err = s.committer.commitCursor(cursor)
return err
}, s.commitBackOff, s.notifyErr)
if err == nil {
s.notifyOK()
}
return err
}
// Close ends the stream.
func (s *StreamAPI) Close() error {
s.cancel()
return nil
}
// startStream is used to start a background routine which consumes events using a streamOpener and streamer.
// this routine will never terminate (not even on errors) unless the stream is closed.
func (s *StreamAPI) startStream() {
for {
var err error
var stream streamer
backoff.RetryNotify(func() error {
stream, err = s.opener.openStream()
return err
}, s.streamBackOff, s.notifyErr)
if err != nil {
continue
}
s.notifyOK()
var cursor Cursor
var events []byte
for {
select {
case <-s.ctx.Done():
err = context.Canceled
default:
cursor, events, err = stream.nextEvents()
}
if err == nil && len(events) == 0 {
continue
}
select {
case <-s.ctx.Done():
err = context.Canceled
case s.eventCh <- eventsOrError{cursor: cursor, events: events, err: err}:
// nothing
}
if err != nil {
if err == context.Canceled {
stream.closeStream()
close(s.eventCh)
return
}
break
}
}
stream.closeStream()
}
}
// streamOpener is a internally used interface which is used to establish a new stream.
type streamOpener interface {
openStream() (streamer, error)
}
// streamer is a internally used interface which is used to consume events from a stream.
type streamer interface {
nextEvents() (Cursor, []byte, error)
closeStream() error
}
// committer is a internally used interface which is used to commit cursors.
type committer interface {
commitCursor(cursor Cursor) error
}
// eventsOrError is used to represent a successful or failed batch read.
type eventsOrError struct {
cursor Cursor
events []byte
err error
}