Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: [release-2.9.x]: feat: Add backoff to flush op #13275

Merged
merged 1 commit into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1442,7 +1442,23 @@ lifecycler:
# CLI flag: -ingester.flush-check-period
[flush_check_period: <duration> | default = 30s]
# The timeout before a flush is cancelled.
flush_op_backoff:
# Minimum backoff period when a flush fails. Each concurrent flush has its own
# backoff, see `ingester.concurrent-flushes`.
# CLI flag: -ingester.flush-op-backoff-min-period
[min_period: <duration> | default = 10s]

# Maximum backoff period when a flush fails. Each concurrent flush has its own
# backoff, see `ingester.concurrent-flushes`.
# CLI flag: -ingester.flush-op-backoff-max-period
[max_period: <duration> | default = 1m]

# Maximum retries for failed flushes.
# CLI flag: -ingester.flush-op-backoff-retries
[max_retries: <int> | default = 10]

# The timeout for an individual flush. Will be retried up to
# `flush-op-backoff-retries` times.
# CLI flag: -ingester.flush-op-timeout
[flush_op_timeout: <duration> | default = 10m]

Expand Down
37 changes: 28 additions & 9 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ import (
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/tenant"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/net/context"

"github.com/grafana/dskit/tenant"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/util"
Expand Down Expand Up @@ -126,8 +127,9 @@ func (i *Ingester) sweepStream(instance *instance, stream *stream, immediate boo
}

func (i *Ingester) flushLoop(j int) {
l := log.With(util_log.Logger, "loop", j)
defer func() {
level.Debug(util_log.Logger).Log("msg", "Ingester.flushLoop() exited")
level.Debug(l).Log("msg", "Ingester.flushLoop() exited")
i.flushQueuesDone.Done()
}()

Expand All @@ -138,9 +140,10 @@ func (i *Ingester) flushLoop(j int) {
}
op := o.(*flushOp)

err := i.flushUserSeries(op.userID, op.fp, op.immediate)
m := util_log.WithUserID(op.userID, l)
err := i.flushOp(m, op)
if err != nil {
level.Error(util_log.WithUserID(op.userID, util_log.Logger)).Log("msg", "failed to flush", "err", err)
level.Error(m).Log("msg", "failed to flush", "err", err)
}

// If we're exiting & we failed to flush, put the failed operation
Expand All @@ -152,7 +155,23 @@ func (i *Ingester) flushLoop(j int) {
}
}

func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
func (i *Ingester) flushOp(l log.Logger, op *flushOp) error {
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()

b := backoff.New(ctx, i.cfg.FlushOpBackoff)
for b.Ongoing() {
err := i.flushUserSeries(ctx, op.userID, op.fp, op.immediate)
if err == nil {
break
}
level.Error(l).Log("msg", "failed to flush", "retries", b.NumRetries(), "err", err)
b.Wait()
}
return b.Err()
}

func (i *Ingester) flushUserSeries(ctx context.Context, userID string, fp model.Fingerprint, immediate bool) error {
instance, ok := i.getInstanceByID(userID)
if !ok {
return nil
Expand All @@ -166,9 +185,9 @@ func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediat
lbs := labels.String()
level.Info(util_log.Logger).Log("msg", "flushing stream", "user", userID, "fp", fp, "immediate", immediate, "num_chunks", len(chunks), "labels", lbs)

ctx := user.InjectOrgID(context.Background(), userID)
ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancel()
ctx = user.InjectOrgID(ctx, userID)
ctx, cancelFunc := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
defer cancelFunc()
err := i.flushChunks(ctx, fp, labels, chunks, chunkMtx)
if err != nil {
return fmt.Errorf("failed to flush chunks: %w, num_chunks: %d, labels: %s", err, len(chunks), lbs)
Expand Down
66 changes: 66 additions & 0 deletions pkg/ingester/flush_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingester

import (
"errors"
"fmt"
"os"
"sort"
Expand Down Expand Up @@ -100,6 +101,67 @@ func Benchmark_FlushLoop(b *testing.B) {
}
}

func Test_FlushOp(t *testing.T) {
t.Run("no error", func(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushOpBackoff.MinBackoff = time.Second
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
cfg.FlushOpBackoff.MaxRetries = 1
cfg.FlushCheckPeriod = 100 * time.Millisecond

_, ing := newTestStore(t, cfg, nil)

ctx := user.InjectOrgID(context.Background(), "foo")
ins, err := ing.GetOrCreateInstance("foo")
require.NoError(t, err)

lbs := makeRandomLabels()
req := &logproto.PushRequest{Streams: []logproto.Stream{{
Labels: lbs.String(),
Entries: entries(5, time.Now()),
}}}
require.NoError(t, ins.Push(ctx, req))

time.Sleep(cfg.FlushCheckPeriod)
require.NoError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{
immediate: true,
userID: "foo",
fp: ins.getHashForLabels(lbs),
}))
})

t.Run("max retries exceeded", func(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.FlushOpBackoff.MinBackoff = time.Second
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
cfg.FlushOpBackoff.MaxRetries = 1
cfg.FlushCheckPeriod = 100 * time.Millisecond

store, ing := newTestStore(t, cfg, nil)
store.onPut = func(_ context.Context, _ []chunk.Chunk) error {
return errors.New("failed to write chunks")
}

ctx := user.InjectOrgID(context.Background(), "foo")
ins, err := ing.GetOrCreateInstance("foo")
require.NoError(t, err)

lbs := makeRandomLabels()
req := &logproto.PushRequest{Streams: []logproto.Stream{{
Labels: lbs.String(),
Entries: entries(5, time.Now()),
}}}
require.NoError(t, ins.Push(ctx, req))

time.Sleep(cfg.FlushCheckPeriod)
require.EqualError(t, ing.flushOp(gokitlog.NewNopLogger(), &flushOp{
immediate: true,
userID: "foo",
fp: ins.getHashForLabels(lbs),
}), "terminated after 1 retries")
})
}

func Test_Flush(t *testing.T) {
var (
store, ing = newTestStore(t, defaultIngesterTestConfig(t), nil)
Expand Down Expand Up @@ -295,6 +357,10 @@ func defaultIngesterTestConfig(t testing.TB) Config {

cfg := Config{}
flagext.DefaultValues(&cfg)
cfg.FlushOpBackoff.MinBackoff = 100 * time.Millisecond
cfg.FlushOpBackoff.MaxBackoff = 10 * time.Second
cfg.FlushOpBackoff.MaxRetries = 1
cfg.FlushOpTimeout = 15 * time.Second
cfg.FlushCheckPeriod = 99999 * time.Hour
cfg.MaxChunkIdle = 99999 * time.Hour
cfg.ConcurrentFlushes = 1
Expand Down
17 changes: 14 additions & 3 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/modules"
"github.com/grafana/dskit/multierror"
Expand Down Expand Up @@ -80,6 +81,7 @@ type Config struct {

ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
FlushOpBackoff backoff.Config `yaml:"flush_op_backoff"`
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
RetainPeriod time.Duration `yaml:"chunk_retain_period"`
MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
Expand Down Expand Up @@ -123,7 +125,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 0, "Number of times to try and transfer chunks before falling back to flushing. If set to 0 or negative value, transfers are disabled.")
f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.")
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.")
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout before a flush is cancelled.")
f.DurationVar(&cfg.FlushOpBackoff.MinBackoff, "ingester.flush-op-backoff-min-period", 10*time.Second, "Minimum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.")
f.DurationVar(&cfg.FlushOpBackoff.MaxBackoff, "ingester.flush-op-backoff-max-period", time.Minute, "Maximum backoff period when a flush fails. Each concurrent flush has its own backoff, see `ingester.concurrent-flushes`.")
f.IntVar(&cfg.FlushOpBackoff.MaxRetries, "ingester.flush-op-backoff-retries", 10, "Maximum retries for failed flushes.")
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Minute, "The timeout for an individual flush. Will be retried up to `flush-op-backoff-retries` times.")
f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 0, "How long chunks should be retained in-memory after they've been flushed.")
f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "How long chunks should sit in-memory with no updates before being flushed if they don't hit the max block size. This means that half-empty chunks will still be flushed after a certain period as long as they receive no further activity.")
f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.")
Expand Down Expand Up @@ -151,8 +156,14 @@ func (cfg *Config) Validate() error {
return err
}

if cfg.MaxTransferRetries > 0 && cfg.WAL.Enabled {
return errors.New("the use of the write ahead log (WAL) is incompatible with chunk transfers. It's suggested to use the WAL. Please try setting ingester.max-transfer-retries to 0 to disable transfers")
if cfg.FlushOpBackoff.MinBackoff > cfg.FlushOpBackoff.MaxBackoff {
return errors.New("invalid flush op min backoff: cannot be larger than max backoff")
}
if cfg.FlushOpBackoff.MaxRetries <= 0 {
return fmt.Errorf("invalid flush op max retries: %d", cfg.FlushOpBackoff.MaxRetries)
}
if cfg.FlushOpTimeout <= 0 {
return fmt.Errorf("invalid flush op timeout: %s", cfg.FlushOpTimeout)
}

if cfg.IndexShards <= 0 {
Expand Down
101 changes: 82 additions & 19 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/middleware"
Expand Down Expand Up @@ -660,57 +661,119 @@ func TestIngester_asyncStoreMaxLookBack(t *testing.T) {

func TestValidate(t *testing.T) {
for i, tc := range []struct {
in Config
err bool
expected Config
in Config
expected Config
expectedErr string
}{
{
in: Config{
MaxChunkAge: time.Minute,
ChunkEncoding: chunkenc.EncGZIP.String(),
IndexShards: index.DefaultIndexShards,
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
MaxRetries: 1,
},
FlushOpTimeout: 15 * time.Second,
IndexShards: index.DefaultIndexShards,
MaxChunkAge: time.Minute,
},
expected: Config{
ChunkEncoding: chunkenc.EncGZIP.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
MaxRetries: 1,
},
FlushOpTimeout: 15 * time.Second,
IndexShards: index.DefaultIndexShards,
MaxChunkAge: time.Minute,
ChunkEncoding: chunkenc.EncGZIP.String(),
parsedEncoding: chunkenc.EncGZIP,
IndexShards: index.DefaultIndexShards,
},
},
{
in: Config{
ChunkEncoding: chunkenc.EncSnappy.String(),
IndexShards: index.DefaultIndexShards,
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
MaxRetries: 1,
},
FlushOpTimeout: 15 * time.Second,
IndexShards: index.DefaultIndexShards,
},
expected: Config{
ChunkEncoding: chunkenc.EncSnappy.String(),
parsedEncoding: chunkenc.EncSnappy,
ChunkEncoding: chunkenc.EncSnappy.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
MaxRetries: 1,
},
FlushOpTimeout: 15 * time.Second,
IndexShards: index.DefaultIndexShards,
parsedEncoding: chunkenc.EncSnappy,
},
},
{
in: Config{
IndexShards: index.DefaultIndexShards,
ChunkEncoding: "bad-enc",
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
MaxRetries: 1,
},
FlushOpTimeout: 15 * time.Second,
IndexShards: index.DefaultIndexShards,
},
expectedErr: "invalid encoding: bad-enc, supported: none, gzip, lz4-64k, snappy, lz4-256k, lz4-1M, lz4, flate, zstd",
},
{
in: Config{
ChunkEncoding: chunkenc.EncGZIP.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
},
FlushOpTimeout: 15 * time.Second,
IndexShards: index.DefaultIndexShards,
MaxChunkAge: time.Minute,
},
expectedErr: "invalid flush op max retries: 0",
},
{
in: Config{
ChunkEncoding: chunkenc.EncGZIP.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
MaxRetries: 1,
},
IndexShards: index.DefaultIndexShards,
MaxChunkAge: time.Minute,
},
err: true,
expectedErr: "invalid flush op timeout: 0s",
},
{
in: Config{
MaxChunkAge: time.Minute,
ChunkEncoding: chunkenc.EncGZIP.String(),
FlushOpBackoff: backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
MaxRetries: 1,
},
FlushOpTimeout: 15 * time.Second,
MaxChunkAge: time.Minute,
},
err: true,
expectedErr: "invalid ingester index shard factor: 0",
},
} {
t.Run(fmt.Sprint(i), func(t *testing.T) {
err := tc.in.Validate()
if tc.err {
require.NotNil(t, err)
return
if tc.expectedErr != "" {
require.EqualError(t, err, tc.expectedErr)
} else {
require.NoError(t, err)
require.Equal(t, tc.expected, tc.in)
}
require.Nil(t, err)
require.Equal(t, tc.expected, tc.in)
})
}
}
Expand Down
Loading
Loading