Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add graceful shutdown to loki.write draining the WAL #5804

Merged
merged 14 commits into from
Dec 11, 2023
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ Main (unreleased)

- The `remote.http` component can optionally define a request body. (@tpaschalis)

- Added support for `loki.write` to flush WAL on agent shutdown. (@thepalbi)

### Bugfixes

- Update `pyroscope.ebpf` to fix a logical bug causing to profile to many kthreads instead of regular processes https://github.com/grafana/pyroscope/pull/2778 (@korniltsev)
Expand Down
88 changes: 64 additions & 24 deletions component/common/loki/client/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,48 @@ func (n nilNotifier) SubscribeCleanup(_ wal.CleanupEventSubscriber) {}

func (n nilNotifier) SubscribeWrite(_ wal.WriteEventSubscriber) {}

type Stoppable interface {
type StoppableWatcher interface {
Stop()
Drain()
}

type StoppableClient interface {
Stop()
StopNow()
}

// watcherClientPair represents a pair of watcher and client, which are coupled together, or just a single client.
type watcherClientPair struct {
watcher StoppableWatcher
client StoppableClient
}

// Stop will proceed to stop, in order, the possibly-nil watcher and the client.
func (p watcherClientPair) Stop(drain bool) {
// if the config has WAL disabled, there will be no watcher per client config
if p.watcher != nil {
// if drain enabled, drain the WAL
if drain {
p.watcher.Drain()
}
p.watcher.Stop()
}

// subsequently stop the client
p.client.Stop()
}

// Manager manages remote write client instantiation, and connects the related components to orchestrate the flow of loki.Entry
// from the scrape targets, to the remote write clients themselves.
//
// Right now it just supports instantiating the WAL writer side of the future-to-be WAL enabled client. In follow-up
// work, tracked in https://github.com/grafana/loki/issues/8197, this Manager will be responsible for instantiating all client
// types: Logger, Multi and WAL.
type Manager struct {
name string
clients []Client
walWatchers []Stoppable
name string

// stoppableClients is kept separate from clients for avoiding having to couple queueClient to the Client interface
stoppableClients []StoppableClient
clients []Client
pairs []watcherClientPair

entries chan loki.Entry
once sync.Once
Expand All @@ -78,8 +98,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr

clientsCheck := make(map[string]struct{})
clients := make([]Client, 0, len(clientCfgs))
watchers := make([]Stoppable, 0, len(clientCfgs))
stoppableClients := make([]StoppableClient, 0, len(clientCfgs))
pairs := make([]watcherClientPair, 0, len(clientCfgs))
for _, cfg := range clientCfgs {
// Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name).
clientName := GetClientName(cfg)
Expand All @@ -103,7 +122,6 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr
if err != nil {
return nil, fmt.Errorf("error starting queue client: %w", err)
}
stoppableClients = append(stoppableClients, queue)

// subscribe watcher's wal.WriteTo to writer events. This will make the writer trigger the cleanup of the wal.WriteTo
// series cache whenever a segment is deleted.
Expand All @@ -116,22 +134,27 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr
level.Debug(logger).Log("msg", "starting WAL watcher for client", "client", clientName)
watcher.Start()

watchers = append(watchers, watcher)
pairs = append(pairs, watcherClientPair{
watcher: watcher,
client: queue,
})
} else {
client, err := New(metrics, cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger)
if err != nil {
return nil, fmt.Errorf("error starting client: %w", err)
}

clients = append(clients, client)
stoppableClients = append(stoppableClients, client)

pairs = append(pairs, watcherClientPair{
client: client,
})
}
}
manager := &Manager{
clients: clients,
stoppableClients: stoppableClients,
walWatchers: watchers,
entries: make(chan loki.Entry),
clients: clients,
pairs: pairs,
entries: make(chan loki.Entry),
}
if walCfg.Enabled {
manager.name = buildManagerName("wal", clientCfgs...)
Expand Down Expand Up @@ -174,8 +197,8 @@ func (m *Manager) startWithForward() {
}

func (m *Manager) StopNow() {
for _, c := range m.stoppableClients {
c.StopNow()
for _, pair := range m.pairs {
pair.client.StopNow()
}
}

Expand All @@ -187,18 +210,35 @@ func (m *Manager) Chan() chan<- loki.Entry {
return m.entries
}

// Stop the manager, not draining the Write-Ahead Log, if that mode is enabled.
func (m *Manager) Stop() {
m.StopWithDrain(false)
}

// StopWithDrain will stop the manager, its Write-Ahead Log watchers, and clients accordingly. If drain is enabled,
// the Watchers will attempt to drain the WAL completely.
// The shutdown procedure first stops the Watchers, allowing them to flush as much data into the clients as possible. Then
// the clients are shut down accordingly.
func (m *Manager) StopWithDrain(drain bool) {
// first stop the receiving channel
m.once.Do(func() { close(m.entries) })
m.wg.Wait()
// close wal watchers
for _, walWatcher := range m.walWatchers {
walWatcher.Stop()
}
// close clients
for _, c := range m.stoppableClients {
c.Stop()

var stopWG sync.WaitGroup

// Depending on whether drain is enabled, the maximum time stopping a watcher and it's client can take is
// the drain time of the watcher + drain time client. To minimize this, and since we keep a separate WAL for each
// client config, each (watcher, client) pair is stopped concurrently.
for _, pair := range m.pairs {
stopWG.Add(1)
go func(pair watcherClientPair) {
defer stopWG.Done()
pair.Stop(drain)
}(pair)
}

// wait for all pairs to be stopped
stopWG.Wait()
}

// GetClientName computes the specific name for each client config. The name is either the configured Name setting in Config,
Expand Down
4 changes: 2 additions & 2 deletions component/common/loki/client/queue_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// StoppableWriteTo is a mixing of the WAL's WriteTo interface, that is Stoppable as well.
type StoppableWriteTo interface {
agentWal.WriteTo
Stoppable
Stop()
StopNow()
}

Expand All @@ -38,7 +38,7 @@ type StoppableWriteTo interface {
type MarkerHandler interface {
UpdateReceivedData(segmentId, dataCount int) // Data queued for sending
UpdateSentData(segmentId, dataCount int) // Data which was sent or given up on sending
Stoppable
Stop()
}

// queuedBatch is a batch specific to a tenant, that is considered ready to be sent.
Expand Down
7 changes: 6 additions & 1 deletion component/common/loki/wal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ const (

// DefaultWatchConfig is the opinionated defaults for operating the Watcher.
var DefaultWatchConfig = WatchConfig{
MinReadFrequency: time.Millisecond * 250,
MinReadFrequency: 250 * time.Millisecond,
MaxReadFrequency: time.Second,
DrainTimeout: 15 * time.Second,
}

// Config contains all WAL-related settings.
Expand Down Expand Up @@ -49,6 +50,10 @@ type WatchConfig struct {
// MaxReadFrequency controls the maximum read frequency the Watcher polls the WAL for new records. As mentioned above
// it caps the polling frequency to a maximum, to prevent to exponential backoff from making it too high.
MaxReadFrequency time.Duration

// DrainTimeout is the maximum amount of time that the Watcher can spend draining the remaining segments in the WAL.
// After that time, the Watcher is stopped immediately, dropping all the work in process.
DrainTimeout time.Duration
}

// UnmarshalYAML implement YAML Unmarshaler
Expand Down
88 changes: 88 additions & 0 deletions component/common/loki/wal/internal/watcher_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package internal

import (
"sync"

"github.com/go-kit/log"
"github.com/grafana/agent/pkg/flow/logging/level"
)

const (
// StateRunning is the main functioning state of the watcher. It will keep tailing head segments, consuming closed
// ones, and checking for new ones.
StateRunning = iota

// StateDraining is an intermediary state between running and stopping. The watcher will attempt to consume all the data
// found in the WAL, omitting errors and assuming all segments found are "closed", that is, no longer being written.
StateDraining

// StateStopping means the Watcher is being stopped. It should drop all segment read activity, and exit promptly.
StateStopping
)

// WatcherState is a holder for the state the Watcher is in. It provides handy methods for checking it it's stopping, getting
// the current state, or blocking until it has stopped.
type WatcherState struct {
current int
mut sync.RWMutex
stoppingSignal chan struct{}
logger log.Logger
}

func NewWatcherState(l log.Logger) *WatcherState {
return &WatcherState{
current: StateRunning,
stoppingSignal: make(chan struct{}),
logger: l,
}
}

// Transition changes the state of WatcherState to next, reacting accordingly.
func (s *WatcherState) Transition(next int) {
s.mut.Lock()
defer s.mut.Unlock()

level.Debug(s.logger).Log("msg", "watcher transitioning state", "currentState", printState(s.current), "nextState", printState(next))

// only perform channel close if the state is not already stopping
// expect s.s to be either draining ro running to perform a close
if next == StateStopping && s.current != next {
close(s.stoppingSignal)
}

// update state
s.current = next
}

// IsDraining evaluates to true if the current state is StateDraining.
func (s *WatcherState) IsDraining() bool {
s.mut.RLock()
defer s.mut.RUnlock()
return s.current == StateDraining
}

// IsStopping evaluates to true if the current state is StateStopping.
func (s *WatcherState) IsStopping() bool {
s.mut.RLock()
defer s.mut.RUnlock()
return s.current == StateStopping
}

// WaitForStopping returns a channel in which the called can read, effectively waiting until the state changes to stopping.
func (s *WatcherState) WaitForStopping() <-chan struct{} {
return s.stoppingSignal
}

// printState prints a user-friendly name of the possible Watcher states.
func printState(state int) string {
switch state {
case StateRunning:
return "running"
case StateDraining:
return "draining"
case StateStopping:
return "stopping"
default:
return "unknown"
}
}
Loading