-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
122 lines (101 loc) · 2.44 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package main
import (
"context"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var logger *zap.Logger
func init() {
initLogger()
initMetrics()
}
func initLogger() {
level := zap.NewAtomicLevel()
level.SetLevel(zapcore.DebugLevel)
zapConfig := zap.Config{
Level: level,
Encoding: "json",
EncoderConfig: zapcore.EncoderConfig{
MessageKey: "msg",
TimeKey: "time",
EncodeTime: zapcore.ISO8601TimeEncoder,
LevelKey: "level",
EncodeLevel: zapcore.CapitalLevelEncoder,
},
OutputPaths: []string{"stdout"},
ErrorOutputPaths: []string{"stderr"},
}
l, err := zapConfig.Build()
if err != nil {
panic(err)
}
logger = l
}
var (
msgCounter = prometheus.NewCounter(prometheus.CounterOpts{
Name: "sqs_ws_message_counter",
Help: "Numer of messages to ws",
})
)
func initMetrics() {
prometheus.MustRegister(msgCounter)
}
func main() {
var confFileName string
flag.StringVar(&confFileName, "c", "", "config file path")
flag.Parse()
if confFileName == "" {
flag.PrintDefaults()
return
}
logger.Info("read config from", zap.String("path", confFileName))
config, err := NewConfig(confFileName)
if err != nil {
logger.Sync()
logger.Fatal("parseConfig error", zap.NamedError("error", err))
}
exitch := make(chan bool, 1)
receiver := newSQSReceiver(exitch, config)
go receiver.run()
hub := newHub(receiver)
go hub.run()
m := http.NewServeMux()
m.Handle("/metrics", promhttp.Handler())
m.HandleFunc("/", serveHome) // TODO: disable on production
m.HandleFunc(config.WSEndpoint, func(w http.ResponseWriter, r *http.Request) {
serveWs(hub, w, r)
})
port := fmt.Sprintf(":%d", config.WSPort)
srv := &http.Server{
Addr: port,
Handler: m,
}
go func() {
if err := srv.ListenAndServe(); err != nil {
logger.Fatal("ListenAndServe: ", zap.NamedError("error", err))
logger.Sync()
os.Exit(1)
}
}()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh // block until signal comes
// close SQS handler and WS clients
exitch <- true
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
if err := srv.Shutdown(ctx); err != nil {
logger.Fatal("Shutdown failed", zap.NamedError("error", err))
logger.Sync()
os.Exit(1)
}
logger.Info("sqs_ws finished")
}