-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhandler.go
145 lines (120 loc) · 3.01 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package gcs_monitor
import (
"encoding/json"
"errors"
"time"
"cloud.google.com/go/pubsub"
"go.uber.org/zap"
"golang.org/x/net/context"
storage "google.golang.org/api/storage/v1"
"fmt"
)
type BucketHandler struct {
logger *zap.Logger
metrics *PromMetrics
workers int
sub *pubsub.Subscription
ctx context.Context
topic string
}
const (
subName = "gcs-monitor"
)
func NewHandler(logger *zap.Logger, metrics *PromMetrics, workers int, ctx context.Context) *BucketHandler {
return &BucketHandler{
logger: logger,
metrics: metrics,
workers: workers,
ctx: ctx,
}
}
func (h *BucketHandler) Init(client *pubsub.Client, project string, topic string) error {
ctx := context.Background()
h.logger.Info("initializing pubsub",
zap.String("project", project),
zap.String("topic", topic),
)
h.topic = topic // for logging purposes later
t := client.Topic(topic)
ok, err := t.Exists(ctx)
if err != nil {
return err
}
if !ok {
return errors.New("topic doesn't exist")
}
h.sub = client.Subscription(subName)
ok, err = h.sub.Exists(ctx)
if err != nil {
return err
}
if !ok {
cfg := pubsub.SubscriptionConfig{
Topic: t,
AckDeadline: 20 * time.Second,
}
h.sub, err = client.CreateSubscription(ctx, subName, cfg)
h.sub.ReceiveSettings.MaxExtension = 1 * time.Minute
h.sub.ReceiveSettings.NumGoroutines = h.workers
if err != nil {
return err
}
h.logger.Debug("created subscription",
zap.String("topic", h.topic),
zap.String("subscription", h.sub.String()),
)
}
return nil
}
func (h *BucketHandler) handle(cancel context.CancelFunc, m *pubsub.Message) {
defer func() {
cancel()
h.logger.Debug("canceling context for message",
zap.String("msgID", m.ID),
)
}()
// eventType and buckeId are always attributes on the message from GCS
h.metrics.IncFileCounter(m.Attributes["eventType"], m.Attributes["bucketId"])
if m.Attributes["payloadFormat"] == "JSON_API_V1" {
var o storage.Object
err := json.Unmarshal(m.Data, &o)
if err != nil {
h.logger.Error("error unmarshalling json payload",
zap.String("msgID", m.ID),
zap.Error(err),
)
} else {
if o.Size > 0 {
h.metrics.ObserveSize(m.Attributes["eventType"], m.Attributes["bucketId"], float64(o.Size))
} else {
h.metrics.IncZeroFileCounter(m.Attributes["eventType"], m.Attributes["bucketId"])
}
m.Ack()
}
}
return
}
func (h *BucketHandler) Run() {
h.logger.Info("starting runnner for subscription",
zap.String("subscription", h.sub.String()),
zap.String("topic", h.topic),
)
for {
h.logger.Debug("Receive start")
cctx, cancel := context.WithCancel(h.ctx)
err := h.sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {
h.logger.Debug("starting message from topic",
zap.String("topic", h.topic),
zap.String("msgID", m.ID),
)
h.handle(cancel, m)
})
if err != nil && err != context.Canceled {
h.logger.Error("error processing message",
zap.Error(err),
)
fmt.Printf("%v\n", err)
}
h.logger.Debug("Receive done")
}
}