-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpoller.go
246 lines (200 loc) · 5.67 KB
/
poller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
package poller
import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"time"
"github.com/onflow/flow-go-sdk"
"github.com/onflow/flow-go-sdk/client"
)
const DefaultMaxHeightRange = 250
type ErrorBehavior int
const (
// ErrorBehaviorContinue will log the error, but continue processing events
ErrorBehaviorContinue ErrorBehavior = iota
// ErrorBehaviorStop will log the error, and stop processing events
ErrorBehaviorStop
)
var ErrAbort = fmt.Errorf("polling aborted due to an error")
type EventPoller struct {
// StartHeight sets the starting height for the event poller. If not set, the latest sealed
// block height is used
StartHeight uint64
// PollingErrorBehavior sets the behavior when errors are encountered while polling for events.
PollingErrorBehavior ErrorBehavior
client *client.Client
interval time.Duration
subscriptions map[string][]*Subscription
lastHeader *flow.BlockHeader
}
type BlockEvent struct {
Event *flow.Event
}
type Subscription struct {
ID string
Channel chan *BlockEvent
Events []string
}
func NewEventPoller(client *client.Client, interval time.Duration) *EventPoller {
return &EventPoller{
client: client,
interval: interval,
subscriptions: make(map[string][]*Subscription),
}
}
// Subscribe creates a subscription for a list of events, and returns a Subscription struct, which
// contains a channel to receive events
func (p *EventPoller) Subscribe(events []string) *Subscription {
sub := &Subscription{
ID: randomString(16),
Channel: make(chan *BlockEvent),
Events: events,
}
for _, event := range events {
p.subscriptions[event] = append(p.subscriptions[event], sub)
}
return sub
}
// Unsubscribe removes subscription for all provided events
func (p *EventPoller) Unsubscribe(id string, events []string) {
for _, event := range events {
if _, ok := p.subscriptions[event]; !ok {
continue
}
for i, sub := range p.subscriptions[event] {
if sub.ID == id {
p.subscriptions[event] = append(p.subscriptions[event][:i], p.subscriptions[event][i+1:]...)
break
}
}
if len(p.subscriptions[event]) == 0 {
delete(p.subscriptions, event)
}
}
}
func (p *EventPoller) LastProcessedHeight() uint64 {
if p.lastHeader == nil {
return 0
}
return p.lastHeader.Height
}
// Run runs the event poller
func (p *EventPoller) Run(ctx context.Context) error {
var err error
p.lastHeader, err = p.startHeader(ctx)
if err != nil {
return fmt.Errorf("error getting start header: %w", err)
}
next := time.After(p.interval)
for {
select {
case <-ctx.Done():
return nil
case <-next:
// restart timer immediately so the poller runs approximately every interval instead of
// every interval plus processing time
next = time.After(p.interval)
newLatest, err := p.checkSubscriptions(ctx, p.lastHeader)
// module is shutting down
if err != nil && errors.Is(err, ctx.Err()) {
return nil
}
// error during polling, and we're configured to stop
if errors.Is(err, ErrAbort) {
return err
}
// otherwise, log and continue
if err != nil {
log.Println("error polling events: %v", err)
// Skip updating latest so we don't lose events. The next run will backfill any
// missed blocks
continue
}
p.lastHeader = newLatest
}
}
}
func (p *EventPoller) startHeader(ctx context.Context) (*flow.BlockHeader, error) {
if p.StartHeight > 0 {
return p.client.GetBlockHeaderByHeight(ctx, p.StartHeight)
}
return p.client.GetLatestBlockHeader(ctx, true)
}
func (p *EventPoller) checkSubscriptions(ctx context.Context, lastHeader *flow.BlockHeader) (*flow.BlockHeader, error) {
latest, err := p.client.GetLatestBlockHeader(ctx, true)
if err != nil {
return nil, fmt.Errorf("error getting latest header: %w", err)
}
// nothing to do
if lastHeader.Height >= latest.Height {
return lastHeader, nil
}
var header *flow.BlockHeader
for {
header = latest
// make sure the block range is not larger than the max, otherwise we'll need to break
// it up into multiple ranges
maxHeight := lastHeader.Height + DefaultMaxHeightRange
if latest.Height > maxHeight {
header, err = p.client.GetBlockHeaderByHeight(ctx, maxHeight)
if err != nil {
return nil, fmt.Errorf("error getting header for height %d: %w", maxHeight, err)
}
}
for eventSub := range p.subscriptions {
err = p.pollEvents(ctx, lastHeader.Height+1, header, eventSub)
if err != nil {
// module is shutting down
if ctx.Err() != nil {
return nil, ctx.Err()
}
log.Printf("error polling events %s for %d - %d: %v", eventSub, lastHeader.Height+1, header.Height, err)
if p.PollingErrorBehavior == ErrorBehaviorStop {
return nil, ErrAbort
}
}
}
if header.Height == latest.Height {
break
}
lastHeader = header
}
return header, nil
}
func (p *EventPoller) pollEvents(ctx context.Context, startHeight uint64, header *flow.BlockHeader, eventType string) error {
blockEvents, err := p.client.GetEventsForHeightRange(ctx, client.EventRangeQuery{
Type: eventType,
StartHeight: startHeight,
EndHeight: header.Height,
})
if err != nil {
return err
}
// sent notifications for events
for _, be := range blockEvents {
for _, event := range be.Events {
event := event
for _, sub := range p.subscriptions[event.Type] {
subEvent := &BlockEvent{
Event: &event,
}
select {
case <-ctx.Done():
return nil
case sub.Channel <- subEvent:
}
}
}
}
return nil
}
func randomString(n int) string {
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789")
s := make([]rune, n)
for i := range s {
s[i] = letters[rand.Intn(len(letters))]
}
return string(s)
}