forked from redpanda-data/kminion
-
Notifications
You must be signed in to change notification settings - Fork 4
/
main.go
133 lines (116 loc) · 3.99 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package main
import (
"context"
"errors"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"github.com/cloudhut/kminion/v2/e2e"
"github.com/cloudhut/kminion/v2/kafka"
"github.com/cloudhut/kminion/v2/logging"
"github.com/cloudhut/kminion/v2/minion"
"github.com/cloudhut/kminion/v2/prometheus"
promclient "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/automaxprocs/maxprocs"
"go.uber.org/zap"
)
var (
// ------------------------------------------------------------------------
// Below parameters are set at build time using ldflags.
// ------------------------------------------------------------------------
// version is KMinion's SemVer version (for example: v1.0.0).
version = "development"
// builtAt is a string that represent a human-readable date when the binary was built.
builtAt = "N/A"
// commit is a string that represents the last git commit for this build.
commit = "N/A"
)
func main() {
startupLogger, err := zap.NewProduction()
if err != nil {
panic(fmt.Errorf("failed to create startup logger: %w", err))
}
cfg, err := newConfig(startupLogger)
if err != nil {
startupLogger.Fatal("failed to parse config", zap.Error(err))
}
logger := logging.NewLogger(cfg.Logger, cfg.Exporter.Namespace).Named("main")
logger.Info("started kminion", zap.String("version", version), zap.String("built_at", builtAt))
// set GOMAXPROCS automatically
l := func(format string, a ...interface{}) {
logger.Info(fmt.Sprintf(format, a...))
}
if _, err = maxprocs.Set(maxprocs.Logger(l)); err != nil {
logger.Fatal("failed to set GOMAXPROCS automatically", zap.Error(err))
}
// Setup context that stops when the application receives an interrupt signal
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
wrappedRegisterer := promclient.WrapRegistererWithPrefix(cfg.Exporter.Namespace+"_", promclient.DefaultRegisterer)
// Create kafka service
kafkaSvc := kafka.NewService(cfg.Kafka, logger)
// Create minion service
// Prometheus exporter only talks to the minion service which
// issues all the requests to Kafka and wraps the interface accordingly.
minionSvc, err := minion.NewService(cfg.Minion, logger, kafkaSvc, cfg.Exporter.Namespace, ctx)
if err != nil {
logger.Fatal("failed to setup minion service", zap.Error(err))
}
err = minionSvc.Start(ctx)
if err != nil {
logger.Fatal("failed to start minion service", zap.Error(err))
}
// Create end to end testing service
if cfg.Minion.EndToEnd.Enabled {
e2eService, err := e2e.NewService(
ctx,
cfg.Minion.EndToEnd,
logger,
kafkaSvc,
wrappedRegisterer,
)
if err != nil {
logger.Fatal("failed to create end-to-end monitoring service: %w", zap.Error(err))
}
if err = e2eService.Start(ctx); err != nil {
logger.Fatal("failed to start end-to-end monitoring service", zap.Error(err))
}
}
// The Prometheus exporter that implements the Prometheus collector interface
exporter, err := prometheus.NewExporter(cfg.Exporter, logger, minionSvc)
if err != nil {
logger.Fatal("failed to setup prometheus exporter", zap.Error(err))
}
exporter.InitializeMetrics()
promclient.MustRegister(exporter)
http.Handle("/metrics",
promhttp.InstrumentMetricHandler(
promclient.DefaultRegisterer,
promhttp.HandlerFor(
promclient.DefaultGatherer,
promhttp.HandlerOpts{},
),
),
)
http.Handle("/ready", minionSvc.HandleIsReady())
// Start HTTP server
address := net.JoinHostPort(cfg.Exporter.Host, strconv.Itoa(cfg.Exporter.Port))
srv := &http.Server{Addr: address}
go func() {
<-ctx.Done()
if err := srv.Shutdown(context.Background()); err != nil {
logger.Error("error stopping HTTP server", zap.Error(err))
os.Exit(1)
}
}()
logger.Info("listening on address", zap.String("listen_address", address))
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Error("error starting HTTP server", zap.Error(err))
os.Exit(1)
}
logger.Info("kminion stopped")
}