Skip to content

Commit

Permalink
feat(querier/query-frontend): track and log congestion control latency (
Browse files Browse the repository at this point in the history
#12058)

Signed-off-by: Danny Kopping <[email protected]>
  • Loading branch information
Danny Kopping authored Feb 27, 2024
1 parent b1a61b3 commit 609fdb3
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 84 deletions.
2 changes: 2 additions & 0 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ func RecordRangeAndInstantQueryMetrics(
"ingester_chunk_decompressed_bytes", humanizeBytes(uint64(stats.Ingester.Store.Chunk.GetDecompressedBytes())),
// Total lines post filtering.
"ingester_post_filter_lines", stats.Ingester.Store.Chunk.GetPostFilterLines(),
// Time spent being blocked on congestion control.
"congestion_control_latency", stats.CongestionControlLatency(),
}...)

logValues = append(logValues, tagsToKeyValues(queryTags)...)
Expand Down
13 changes: 13 additions & 0 deletions pkg/logqlmodel/stats/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ func (c *Context) Ingester() Ingester {
}
}

// Querier returns the store statistics accumulated so far.
func (c *Context) Querier() Querier {
return c.querier
}

// Caches returns the cache statistics accumulated so far.
func (c *Context) Caches() Caches {
return Caches{
Expand Down Expand Up @@ -279,6 +284,10 @@ func (r Result) ChunkRefsFetchTime() time.Duration {
return time.Duration(r.Querier.Store.ChunkRefsFetchTime + r.Ingester.Store.ChunkRefsFetchTime)
}

func (r Result) CongestionControlLatency() time.Duration {
return time.Duration(r.Querier.Store.CongestionControlLatency)
}

func (r Result) TotalDuplicates() int64 {
return r.Querier.Store.Chunk.TotalDuplicates + r.Ingester.Store.Chunk.TotalDuplicates
}
Expand Down Expand Up @@ -360,6 +369,10 @@ func (c *Context) AddChunkRefsFetchTime(i time.Duration) {
atomic.AddInt64(&c.store.ChunkRefsFetchTime, int64(i))
}

func (c *Context) AddCongestionControlLatency(i time.Duration) {
atomic.AddInt64(&c.querier.Store.CongestionControlLatency, int64(i))
}

func (c *Context) AddChunksDownloaded(i int64) {
atomic.AddInt64(&c.store.TotalChunksDownloaded, i)
}
Expand Down
201 changes: 122 additions & 79 deletions pkg/logqlmodel/stats/stats.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pkg/logqlmodel/stats/stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ message Store {

// Time spent fetching chunk refs from index.
int64 chunkRefsFetchTime = 5 [(gogoproto.jsontag) = "chunkRefsFetchTime"];

// Time spent being blocked on congestion control.
int64 congestionControlLatency = 6 [(gogoproto.jsontag) = "congestionControlLatency"];
}

message Chunk {
Expand Down
3 changes: 3 additions & 0 deletions pkg/querier/queryrange/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1561,6 +1561,7 @@ var (
"totalDuplicates": 8
},
"chunksDownloadTime": 0,
"congestionControlLatency": 0,
"totalChunksRef": 0,
"totalChunksDownloaded": 0,
"chunkRefsFetchTime": 0,
Expand All @@ -1585,6 +1586,7 @@ var (
"totalDuplicates": 19
},
"chunksDownloadTime": 16,
"congestionControlLatency": 0,
"totalChunksRef": 17,
"totalChunksDownloaded": 18,
"chunkRefsFetchTime": 19,
Expand Down Expand Up @@ -2018,6 +2020,7 @@ var (
TotalDuplicates: 19,
},
ChunksDownloadTime: 16,
CongestionControlLatency: 0,
TotalChunksRef: 17,
TotalChunksDownloaded: 18,
ChunkRefsFetchTime: 19,
Expand Down
2 changes: 2 additions & 0 deletions pkg/querier/queryrange/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var emptyStats = `"stats": {
"ingester" : {
"store": {
"chunksDownloadTime": 0,
"congestionControlLatency": 0,
"totalChunksRef": 0,
"totalChunksDownloaded": 0,
"chunkRefsFetchTime": 0,
Expand All @@ -40,6 +41,7 @@ var emptyStats = `"stats": {
"querier": {
"store": {
"chunksDownloadTime": 0,
"congestionControlLatency": 0,
"totalChunksRef": 0,
"totalChunksDownloaded": 0,
"chunkRefsFetchTime": 0,
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/chunk/client/congestion/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/go-kit/log"
"golang.org/x/time/rate"

"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/storage/chunk/client"
)

Expand Down Expand Up @@ -92,6 +93,9 @@ func (a *AIMDController) GetObject(ctx context.Context, objectKey string) (io.Re

// TODO(dannyk): use hedging client to handle requests, do NOT hedge retries

start := time.Now()
statsCtx := stats.FromContext(ctx)

rc, sz, err := a.retrier.Do(
func(attempt int) (io.ReadCloser, int64, error) {
a.metrics.requests.Add(1)
Expand All @@ -111,6 +115,8 @@ func (a *AIMDController) GetObject(ctx context.Context, objectKey string) (io.Re
a.metrics.backoffSec.Add(delay.Seconds())
}

statsCtx.AddCongestionControlLatency(time.Since(start))

// It is vitally important that retries are DISABLED in the inner implementation.
// Some object storage clients implement retries internally, and this will interfere here.
return a.inner.GetObject(ctx, objectKey)
Expand Down
14 changes: 9 additions & 5 deletions pkg/storage/chunk/client/congestion/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/storage/chunk/client"
)

Expand Down Expand Up @@ -165,8 +166,10 @@ func TestAIMDReducedThroughput(t *testing.T) {
cli := newMockObjectClient(triggeredFailer{trigger: &trigger})
ctrl.Wrap(cli)

statsCtx, ctx := stats.NewContext(context.Background())

// run for 1 second, measure the per-second rate of requests & successful responses
count, success := runAndMeasureRate(ctrl, time.Second)
count, success := runAndMeasureRate(ctx, ctrl, time.Second)
require.Greater(t, count, 1.0)
require.Greater(t, success, 1.0)
// no time spent backing off because the per-second limit will not be hit
Expand Down Expand Up @@ -195,7 +198,7 @@ func TestAIMDReducedThroughput(t *testing.T) {
}(&trigger)

// now, run the requests again but there will now be a failure rate & some throttling involved
count, success = runAndMeasureRate(ctrl, time.Second)
count, success = runAndMeasureRate(ctx, ctrl, time.Second)
done <- true

wg.Wait()
Expand All @@ -206,9 +209,12 @@ func TestAIMDReducedThroughput(t *testing.T) {

// should have fewer successful requests than total since we are failing some
require.Less(t, success, count)

// should have registered some congestion latency in stats
require.NotZero(t, statsCtx.Querier().Store.CongestionControlLatency)
}

func runAndMeasureRate(ctrl Controller, duration time.Duration) (float64, float64) {
func runAndMeasureRate(ctx context.Context, ctrl Controller, duration time.Duration) (float64, float64) {
var count, success float64

tick := time.NewTimer(duration)
Expand All @@ -218,8 +224,6 @@ func runAndMeasureRate(ctrl Controller, duration time.Duration) (float64, float6
case <-tick.C:
goto result
default:
ctx := context.Background()

count++
_, _, err := ctrl.GetObject(ctx, "foo")
if err == nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/marshal/legacy/marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ var queryTests = []struct {
"ingester" : {
"store": {
"chunksDownloadTime": 0,
"congestionControlLatency": 0,
"totalChunksRef": 0,
"totalChunksDownloaded": 0,
"chunkRefsFetchTime": 0,
Expand All @@ -83,6 +84,7 @@ var queryTests = []struct {
"querier": {
"store": {
"chunksDownloadTime": 0,
"congestionControlLatency": 0,
"totalChunksRef": 0,
"totalChunksDownloaded": 0,
"chunkRefsFetchTime": 0,
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/marshal/marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const emptyStats = `{
"ingester" : {
"store": {
"chunksDownloadTime": 0,
"congestionControlLatency": 0,
"totalChunksRef": 0,
"totalChunksDownloaded": 0,
"chunkRefsFetchTime": 0,
Expand All @@ -51,6 +52,7 @@ const emptyStats = `{
"querier": {
"store": {
"chunksDownloadTime": 0,
"congestionControlLatency": 0,
"totalChunksRef": 0,
"totalChunksDownloaded": 0,
"chunkRefsFetchTime": 0,
Expand Down

0 comments on commit 609fdb3

Please sign in to comment.