Skip to content

Commit

Permalink
Merge pull request #860 from google/exporter-shutdown
Browse files Browse the repository at this point in the history
Explicitly stop the `Exporter` when main routines are done.
  • Loading branch information
jaqx0r authored May 6, 2024
2 parents c1e84ca + 64c82f4 commit 4e8528c
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 64 deletions.
10 changes: 4 additions & 6 deletions cmd/mtail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"os/signal"
"runtime"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -249,23 +248,22 @@ func main() {
if *oneShot {
switch *oneShotFormat {
case "prometheus":
var wg sync.WaitGroup
e, err := exporter.New(ctx, &wg, store, eOpts...)
e, err := exporter.New(ctx, store, eOpts...)
if err != nil {
glog.Error(err)
cancel()
wg.Wait()
e.Stop()
os.Exit(1) //nolint:gocritic // false positive
}
err = e.Write(os.Stdout)
if err != nil {
glog.Error(err)
cancel()
wg.Wait()
e.Stop()
os.Exit(1) //nolint:gocritic // false positive
}
cancel()
wg.Wait()
e.Stop()
os.Exit(0) //nolint:gocritic // false positive
case "json":
err = store.WriteMetrics(os.Stdout)
Expand Down
28 changes: 15 additions & 13 deletions internal/exporter/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
// Exporter manages the export of metrics to passive and active collectors.
type Exporter struct {
ctx context.Context
cancelFunc context.CancelFunc
wg sync.WaitGroup
store *metrics.Store
pushInterval time.Duration
Expand All @@ -40,6 +41,7 @@ type Exporter struct {
exportDisabled bool
pushTargets []pushOptions
initDone chan struct{}
shutdownDone chan struct{}
}

// Option configures a new Exporter.
Expand Down Expand Up @@ -83,24 +85,19 @@ func DisableExport() Option {
}
}

var (
ErrNeedsStore = errors.New("exporter needs a Store")
ErrNeedsWaitgroup = errors.New("exporter needs a WaitGroup")
)
var ErrNeedsStore = errors.New("exporter needs a Store")

// New creates a new Exporter.
func New(ctx context.Context, wg *sync.WaitGroup, store *metrics.Store, options ...Option) (*Exporter, error) {
func New(ctx context.Context, store *metrics.Store, options ...Option) (*Exporter, error) {
if store == nil {
return nil, ErrNeedsStore
}
if wg == nil {
return nil, ErrNeedsWaitgroup
}
e := &Exporter{
ctx: ctx,
store: store,
initDone: make(chan struct{}),
store: store,
initDone: make(chan struct{}),
shutdownDone: make(chan struct{}),
}
e.ctx, e.cancelFunc = context.WithCancel(ctx)
defer close(e.initDone)
if err := e.SetOption(options...); err != nil {
return nil, err
Expand Down Expand Up @@ -128,20 +125,25 @@ func New(ctx context.Context, wg *sync.WaitGroup, store *metrics.Store, options
}
e.StartMetricPush()

wg.Add(1)
// This routine manages shutdown of the Exporter.
go func() {
defer wg.Done()
<-e.initDone
// Wait for the context to be completed before waiting for subroutines.
if !e.exportDisabled {
<-e.ctx.Done()
}
e.wg.Wait()
close(e.shutdownDone)
}()
return e, nil
}

// Stop instructs the exporter to shut down. The function returns once the exporter has finished.
func (e *Exporter) Stop() {
e.cancelFunc()
<-e.shutdownDone
}

// SetOption takes one or more option functions and applies them in order to Exporter.
func (e *Exporter) SetOption(options ...Option) error {
for _, option := range options {
Expand Down
32 changes: 10 additions & 22 deletions internal/exporter/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"reflect"
"sort"
"strings"
"sync"
"testing"
"time"

Expand All @@ -22,37 +21,26 @@ const prefix = "prefix"

func TestCreateExporter(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
defer cancel()
store := metrics.NewStore()
_, err := New(ctx, &wg, store)

e, err := New(ctx, store)
if err != nil {
t.Errorf("New(ctx, wg, store) unexpected error: %v", err)
t.Errorf("New(ctx, store) unexpected error: %v", err)
}
cancel()
wg.Wait()
ctx, cancel = context.WithCancel(context.Background())
e.Stop()

failopt := func(*Exporter) error {
return errors.New("busted") // nolint:goerr113
}
_, err = New(ctx, &wg, store, failopt)
_, err = New(ctx, store, failopt)
if err == nil {
t.Errorf("unexpected success")
t.Error("New(ctx, store, fail) -> unexpected success")
}
cancel()
wg.Wait()
}

func TestNewErrors(t *testing.T) {
ctx := context.Background()
store := metrics.NewStore()
var wg sync.WaitGroup
_, err := New(ctx, nil, store)
if err == nil {
t.Error("New(ctx, nil, store) expecting error, received nil")
}
_, err = New(ctx, &wg, nil)
_, err = New(ctx, nil)
if err == nil {
t.Error("New(ctx, wg, nil) expecting error, received nil")
t.Error("New(ctx, nil) -> nil, expecting error")
}
}

Expand Down
8 changes: 4 additions & 4 deletions internal/exporter/graphite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ func TestHandleGraphite(t *testing.T) {
tc := tc
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
defer cancel()

ms := metrics.NewStore()
for _, metric := range tc.metrics {
testutil.FatalIfErr(t, ms.Add(metric))
}
e, err := New(ctx, &wg, ms, Hostname("gunstar"))
e, err := New(ctx, ms, Hostname("gunstar"))
testutil.FatalIfErr(t, err)
response := httptest.NewRecorder()
e.HandleGraphite(response, &http.Request{})
Expand All @@ -64,8 +65,7 @@ func TestHandleGraphite(t *testing.T) {
t.Errorf("failed to read response %s", err)
}
testutil.ExpectNoDiff(t, tc.expected, string(b), testutil.IgnoreUnexported(sync.RWMutex{}))
cancel()
wg.Wait()
e.Stop()
})
}
}
9 changes: 5 additions & 4 deletions internal/exporter/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,13 @@ func TestHandleJSON(t *testing.T) {
tc := tc
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
defer cancel()

ms := metrics.NewStore()
for _, metric := range tc.metrics {
testutil.FatalIfErr(t, ms.Add(metric))
}
e, err := New(ctx, &wg, ms, Hostname("gunstar"))
e, err := New(ctx, ms, Hostname("gunstar"))
testutil.FatalIfErr(t, err)
response := httptest.NewRecorder()
e.HandleJSON(response, &http.Request{})
Expand All @@ -158,8 +159,8 @@ func TestHandleJSON(t *testing.T) {
t.Errorf("failed to read response: %s", err)
}
testutil.ExpectNoDiff(t, tc.expected, string(b), testutil.IgnoreUnexported(sync.RWMutex{}))
cancel()
wg.Wait()

e.Stop()
})
}
}
17 changes: 8 additions & 9 deletions internal/exporter/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"math"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -255,8 +254,9 @@ func TestHandlePrometheus(t *testing.T) {
for _, tc := range handlePrometheusTests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ms := metrics.NewStore()
for _, metric := range tc.metrics {
testutil.FatalIfErr(t, ms.Add(metric))
Expand All @@ -267,14 +267,13 @@ func TestHandlePrometheus(t *testing.T) {
if !tc.progLabel {
opts = append(opts, OmitProgLabel())
}
e, err := New(ctx, &wg, ms, opts...)
e, err := New(ctx, ms, opts...)
testutil.FatalIfErr(t, err)
r := strings.NewReader(tc.expected)
if err = promtest.CollectAndCompare(e, r); err != nil {
t.Error(err)
}
cancel()
wg.Wait()
e.Stop()
})
}
}
Expand Down Expand Up @@ -334,8 +333,9 @@ func TestWritePrometheus(t *testing.T) {
for _, tc := range writePrometheusTests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ms := metrics.NewStore()
for _, metric := range tc.metrics {
testutil.FatalIfErr(t, ms.Add(metric))
Expand All @@ -344,16 +344,15 @@ func TestWritePrometheus(t *testing.T) {
Hostname("gunstar"),
OmitProgLabel(),
}
e, err := New(ctx, &wg, ms, opts...)
e, err := New(ctx, ms, opts...)
testutil.FatalIfErr(t, err)

var buf bytes.Buffer
err = e.Write(&buf)
testutil.FatalIfErr(t, err)
testutil.ExpectNoDiff(t, tc.expected, buf.String())

cancel()
wg.Wait()
e.Stop()
})
}
}
10 changes: 5 additions & 5 deletions internal/exporter/varz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"io"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -73,13 +72,14 @@ func TestHandleVarz(t *testing.T) {
for _, tc := range handleVarzTests {
tc := tc
t.Run(tc.name, func(t *testing.T) {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ms := metrics.NewStore()
for _, metric := range tc.metrics {
testutil.FatalIfErr(t, ms.Add(metric))
}
e, err := New(ctx, &wg, ms, Hostname("gunstar"))
e, err := New(ctx, ms, Hostname("gunstar"))
testutil.FatalIfErr(t, err)
response := httptest.NewRecorder()
e.HandleVarz(response, &http.Request{})
Expand All @@ -91,8 +91,8 @@ func TestHandleVarz(t *testing.T) {
t.Errorf("failed to read response: %s", err)
}
testutil.ExpectNoDiff(t, tc.expected, string(b))
cancel()
wg.Wait()

e.Stop()
})
}
}
3 changes: 2 additions & 1 deletion internal/mtail/mtail.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (m *Server) initRuntime() (err error) {

// initExporter sets up an Exporter for this Server.
func (m *Server) initExporter() (err error) {
m.e, err = exporter.New(m.ctx, &m.wg, m.store, m.eOpts...)
m.e, err = exporter.New(m.ctx, m.store, m.eOpts...)
if err != nil {
return err
}
Expand Down Expand Up @@ -234,6 +234,7 @@ func (m *Server) SetOption(options ...Option) error {
// TODO(jaq): remove this once the test server is able to trigger polls on the components.
func (m *Server) Run() error {
m.wg.Wait()
m.e.Stop()
if m.compileOnly {
glog.Info("compile-only is set, exiting")
return nil
Expand Down

0 comments on commit 4e8528c

Please sign in to comment.