forked from stoewer/go-nakadi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubscriptions.go
145 lines (128 loc) · 4.77 KB
/
subscriptions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package nakadi
import (
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/cenkalti/backoff"
"github.com/pkg/errors"
)
// Subscription represents a subscription as used by the Nakadi high level API.
type Subscription struct {
ID string `json:"id,omitempty"`
OwningApplication string `json:"owning_application"`
EventTypes []string `json:"event_types"`
ConsumerGroup string `json:"consumer_group,omitempty"`
ReadFrom string `json:"read_from,omitempty"`
CreatedAt time.Time `json:"created_at,omitempty"`
}
// SubscriptionOptions is a set of optional parameters used to configure the SubscriptionAPI.
type SubscriptionOptions struct {
// Whether or not methods of the SubscriptionAPI retry when a request fails. If
// set to true InitialRetryInterval, MaxRetryInterval, and MaxElapsedTime have
// no effect (default: false).
Retry bool
// The initial (minimal) retry interval used for the exponential backoff algorithm
// when retry is enables.
InitialRetryInterval time.Duration
// MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches
// this value the retry intervals remain constant.
MaxRetryInterval time.Duration
// MaxElapsedTime is the maximum time spent on retries when when performing a request.
// Once this value was reached the exponential backoff is halted and the request will
// fail with an error.
MaxElapsedTime time.Duration
}
func (o *SubscriptionOptions) withDefaults() *SubscriptionOptions {
var copyOptions SubscriptionOptions
if o != nil {
copyOptions = *o
}
if copyOptions.InitialRetryInterval == 0 {
copyOptions.InitialRetryInterval = defaultInitialRetryInterval
}
if copyOptions.MaxRetryInterval == 0 {
copyOptions.MaxRetryInterval = defaultMaxRetryInterval
}
if copyOptions.MaxElapsedTime == 0 {
copyOptions.MaxElapsedTime = defaultMaxElapsedTime
}
return ©Options
}
// NewSubscriptionAPI crates a new instance of the SubscriptionAPI. As for all sub APIs of the `go-nakadi` package
// NewSubscriptionAPI receives a configured Nakadi client. The last parameter is a struct containing only optional \
// parameters. The options may be nil.
func NewSubscriptionAPI(client *Client, options *SubscriptionOptions) *SubscriptionAPI {
options = options.withDefaults()
var backOff backoff.BackOff
if options.Retry {
back := backoff.NewExponentialBackOff()
back.InitialInterval = options.InitialRetryInterval
back.MaxInterval = options.MaxRetryInterval
back.MaxElapsedTime = options.MaxElapsedTime
backOff = back
} else {
backOff = &backoff.StopBackOff{}
}
return &SubscriptionAPI{
client: client,
backOff: backOff}
}
// SubscriptionAPI is a sub API that is used to manage subscriptions.
type SubscriptionAPI struct {
client *Client
backOff backoff.BackOff
}
// List returns all available subscriptions.
func (s *SubscriptionAPI) List() ([]*Subscription, error) {
subscriptions := struct {
Items []*Subscription `json:"items"`
}{}
err := s.client.httpGET(s.backOff, s.subBaseURL(), &subscriptions, "unable to request subscriptions")
if err != nil {
return nil, err
}
return subscriptions.Items, nil
}
// Get obtains a single subscription identified by its ID.
func (s *SubscriptionAPI) Get(id string) (*Subscription, error) {
subscription := &Subscription{}
err := s.client.httpGET(s.backOff, s.subURL(id), subscription, "unable to request subscription")
if err != nil {
return nil, err
}
return subscription, err
}
// Create initializes a new subscription. If the subscription already exists the pre existing subscription
// is returned.
func (s *SubscriptionAPI) Create(subscription *Subscription) (*Subscription, error) {
response, err := s.client.httpPOST(s.backOff, s.subBaseURL(), subscription)
if err != nil {
return nil, errors.Wrap(err, "unable to create subscription")
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK && response.StatusCode != http.StatusCreated {
problem := problemJSON{}
err := json.NewDecoder(response.Body).Decode(&problem)
if err != nil {
return nil, errors.Wrap(err, "unable to decode response body")
}
return nil, errors.Errorf("unable to create subscription: %s", problem.Detail)
}
subscription = &Subscription{}
err = json.NewDecoder(response.Body).Decode(subscription)
if err != nil {
return nil, errors.Wrap(err, "unable to decode response body")
}
return subscription, nil
}
// Delete removes an existing subscription.
func (s *SubscriptionAPI) Delete(id string) error {
return s.client.httpDELETE(s.backOff, s.subURL(id), "unable to delete subscription")
}
func (s *SubscriptionAPI) subURL(id string) string {
return fmt.Sprintf("%s/subscriptions/%s", s.client.nakadiURL, id)
}
func (s *SubscriptionAPI) subBaseURL() string {
return fmt.Sprintf("%s/subscriptions", s.client.nakadiURL)
}