From 968f37bea2308b86431590b938dfcf35da6c664f Mon Sep 17 00:00:00 2001 From: "Grot (@grafanabot)" <43478413+grafanabot@users.noreply.github.com> Date: Wed, 8 Nov 2023 19:46:42 +0100 Subject: [PATCH] [k174] Propagate query metrics and cache num information. (#11179) Backport 979530b6b94b489980b58ec2ee9cd46232149488 from #11176 --- **What this PR does / why we need it**: https://github.com/grafana/loki/pull/10858 removed the extraction of the query time header on the querier side and the generation of the cache number. This change adds them back and uses the headers of the response format. **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](https://github.com/grafana/loki/pull/10840/commits/0d4416a4b03739583349934b96f272fb4f685d15) Co-authored-by: Karsten Jeschkies --- pkg/loki/modules.go | 11 ++++++-- pkg/querier/queryrange/codec.go | 20 ++++++++++++--- pkg/querier/queryrange/codec_test.go | 3 +++ pkg/querier/queryrange/limits_test.go | 2 +- .../queryrangebase/definitions/interface.go | 1 + .../queryrange/queryrangebase/middleware.go | 25 +++++++++++++++++++ .../queryrange/queryrangebase/query_range.go | 18 +++++++++++++ pkg/querier/queryrange/views.go | 9 +++++++ 8 files changed, 83 insertions(+), 6 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 0059b4665842a..a548d05a9251b 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -513,10 +513,17 @@ func (t *Loki) initQuerier() (services.Service, error) { t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.TailHandler))) t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.TailHandler))) - internalHandler := queryrangebase.MergeMiddlewares( + internalMiddlewares := []queryrangebase.Middleware{ serverutil.RecoveryMiddleware, queryrange.Instrument{Metrics: t.Metrics}, - ).Wrap(handler) + } + if t.supportIndexDeleteRequest() && t.Cfg.CompactorConfig.RetentionEnabled { + internalMiddlewares = append( + internalMiddlewares, + queryrangebase.CacheGenNumberContextSetterMiddleware(t.cacheGenerationLoader), + ) + } + internalHandler := queryrangebase.MergeMiddlewares(internalMiddlewares...).Wrap(handler) svc, err := querier.InitWorkerService( querierWorkerServiceConfig, diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 1752c9e1ca2fd..e3f1905f54d7c 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -383,6 +383,14 @@ func (Codec) DecodeHTTPGrpcRequest(ctx context.Context, r *httpgrpc.HTTPRequest) ctx = httpreq.InjectQueryTags(ctx, queryTags) } + // Add query metrics + if queueTimeHeader := httpReq.Header.Get(string(httpreq.QueryQueueTimeHTTPHeader)); queueTimeHeader != "" { + queueTime, err := time.ParseDuration(queueTimeHeader) + if err == nil { + ctx = context.WithValue(ctx, httpreq.QueryQueueTimeHTTPHeader, queueTime) + } + } + // If there is not encoding flags in the context, we try the HTTP request. if encFlags := httpreq.ExtractEncodingFlagsFromCtx(ctx); encFlags == nil { encFlags = httpreq.ExtractEncodingFlagsFromProto(r) @@ -524,13 +532,19 @@ func (Codec) EncodeHTTPGrpcResponse(_ context.Context, req *httpgrpc.HTTPRequest return nil, err } - return &httpgrpc.HTTPResponse{ + httpRes := &httpgrpc.HTTPResponse{ Code: int32(http.StatusOK), Body: buf.Bytes(), Headers: []*httpgrpc.Header{ {Key: "Content-Type", Values: []string{"application/json; charset=UTF-8"}}, }, - }, nil + } + + for _, h := range res.GetHeaders() { + httpRes.Headers = append(httpRes.Headers, &httpgrpc.Header{Key: h.Name, Values: h.Values}) + } + + return httpRes, nil } func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*http.Request, error) { @@ -1593,7 +1607,7 @@ func NewEmptyResponse(r queryrangebase.Request) (queryrangebase.Response, error) }, }, nil case *logproto.IndexStatsRequest: - return &logproto.IndexStatsResponse{}, nil + return &IndexStatsResponse{}, nil case *logproto.VolumeRequest: return &VolumeResponse{}, nil default: diff --git a/pkg/querier/queryrange/codec_test.go b/pkg/querier/queryrange/codec_test.go index 70814ff83d091..880143976439e 100644 --- a/pkg/querier/queryrange/codec_test.go +++ b/pkg/querier/queryrange/codec_test.go @@ -1421,6 +1421,9 @@ func (badResponse) Reset() {} func (badResponse) String() string { return "noop" } func (badResponse) ProtoMessage() {} func (badResponse) GetHeaders() []*queryrangebase.PrometheusResponseHeader { return nil } +func (b badResponse) WithHeaders([]queryrangebase.PrometheusResponseHeader) queryrangebase.Response { + return b +} type badReader struct{} diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index a3b14efdbe588..55f880af3f15d 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -273,7 +273,7 @@ func Test_MaxQueryLookBack_Types(t *testing.T) { From: model.Time(now.UnixMilli()), Through: model.Time(now.Add(-90 * time.Minute).UnixMilli()), }, - expectedResponse: &logproto.IndexStatsResponse{}, + expectedResponse: &IndexStatsResponse{}, }, { request: &logproto.VolumeRequest{ diff --git a/pkg/querier/queryrange/queryrangebase/definitions/interface.go b/pkg/querier/queryrange/queryrangebase/definitions/interface.go index fb385817b5ba6..f5259bb085c36 100644 --- a/pkg/querier/queryrange/queryrangebase/definitions/interface.go +++ b/pkg/querier/queryrange/queryrangebase/definitions/interface.go @@ -56,4 +56,5 @@ type Response interface { proto.Message // GetHeaders returns the HTTP headers in the response. GetHeaders() []*PrometheusResponseHeader + WithHeaders([]PrometheusResponseHeader) Response } diff --git a/pkg/querier/queryrange/queryrangebase/middleware.go b/pkg/querier/queryrange/queryrangebase/middleware.go index 4e60ce2dec490..6d72086da0694 100644 --- a/pkg/querier/queryrange/queryrangebase/middleware.go +++ b/pkg/querier/queryrange/queryrangebase/middleware.go @@ -1,10 +1,13 @@ package queryrangebase import ( + "context" "net/http" "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/tenant" + + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions" ) const ( @@ -28,3 +31,25 @@ func CacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader CacheGenNumberLo }) }) } + +func CacheGenNumberContextSetterMiddleware(cacheGenNumbersLoader CacheGenNumberLoader) Middleware { + return MiddlewareFunc(func(next Handler) Handler { + return HandlerFunc(func(ctx context.Context, req Request) (Response, error) { + userIDs, err := tenant.TenantIDs(ctx) + if err != nil { + return nil, err + } + + cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(userIDs) + + res, err := next.Do(ctx, req) + if err != nil { + return nil, err + } + header := definitions.PrometheusResponseHeader{ + Name: ResultsCacheGenNumberHeaderName, + Values: []string{cacheGenNumber}} + return res.WithHeaders([]definitions.PrometheusResponseHeader{header}), nil + }) + }) +} diff --git a/pkg/querier/queryrange/queryrangebase/query_range.go b/pkg/querier/queryrange/queryrangebase/query_range.go index 4c7426b1714b1..a2653eb8967be 100644 --- a/pkg/querier/queryrange/queryrangebase/query_range.go +++ b/pkg/querier/queryrange/queryrangebase/query_range.go @@ -88,6 +88,24 @@ func (resp *PrometheusResponse) minTime() int64 { return result[0].Samples[0].TimestampMs } +func convertPrometheusResponseHeadersToPointers(h []PrometheusResponseHeader) []*PrometheusResponseHeader { + if h == nil { + return nil + } + + resp := make([]*PrometheusResponseHeader, len(h)) + for i := range h { + resp[i] = &h[i] + } + + return resp +} + +func (resp *PrometheusResponse) WithHeaders(h []PrometheusResponseHeader) Response { + resp.Headers = convertPrometheusResponseHeadersToPointers(h) + return resp +} + // NewEmptyPrometheusResponse returns an empty successful Prometheus query range response. func NewEmptyPrometheusResponse() *PrometheusResponse { return &PrometheusResponse{ diff --git a/pkg/querier/queryrange/views.go b/pkg/querier/queryrange/views.go index 3f3f39907df5c..9751ce588d6e1 100644 --- a/pkg/querier/queryrange/views.go +++ b/pkg/querier/queryrange/views.go @@ -71,6 +71,11 @@ func (v *LokiSeriesResponseView) GetHeaders() []*queryrangebase.PrometheusRespon return v.headers } +func (v *LokiSeriesResponseView) WithHeaders(h []queryrangebase.PrometheusResponseHeader) queryrangebase.Response { + v.headers = convertPrometheusResponseHeadersToPointers(h) + return v +} + // Implement proto.Message func (v *LokiSeriesResponseView) Reset() {} func (v *LokiSeriesResponseView) String() string { return "" } @@ -240,6 +245,10 @@ func (v *MergedSeriesResponseView) GetHeaders() []*queryrangebase.PrometheusResp return v.headers } +func (v *MergedSeriesResponseView) WithHeaders([]queryrangebase.PrometheusResponseHeader) queryrangebase.Response { + return v +} + // Implement proto.Message func (v *MergedSeriesResponseView) Reset() {} func (v *MergedSeriesResponseView) String() string { return "" }