diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 0059b4665842a..a548d05a9251b 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -513,10 +513,17 @@ func (t *Loki) initQuerier() (services.Service, error) { t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.TailHandler))) t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.TailHandler))) - internalHandler := queryrangebase.MergeMiddlewares( + internalMiddlewares := []queryrangebase.Middleware{ serverutil.RecoveryMiddleware, queryrange.Instrument{Metrics: t.Metrics}, - ).Wrap(handler) + } + if t.supportIndexDeleteRequest() && t.Cfg.CompactorConfig.RetentionEnabled { + internalMiddlewares = append( + internalMiddlewares, + queryrangebase.CacheGenNumberContextSetterMiddleware(t.cacheGenerationLoader), + ) + } + internalHandler := queryrangebase.MergeMiddlewares(internalMiddlewares...).Wrap(handler) svc, err := querier.InitWorkerService( querierWorkerServiceConfig, diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 1752c9e1ca2fd..e3f1905f54d7c 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -383,6 +383,14 @@ func (Codec) DecodeHTTPGrpcRequest(ctx context.Context, r *httpgrpc.HTTPRequest) ctx = httpreq.InjectQueryTags(ctx, queryTags) } + // Add query metrics + if queueTimeHeader := httpReq.Header.Get(string(httpreq.QueryQueueTimeHTTPHeader)); queueTimeHeader != "" { + queueTime, err := time.ParseDuration(queueTimeHeader) + if err == nil { + ctx = context.WithValue(ctx, httpreq.QueryQueueTimeHTTPHeader, queueTime) + } + } + // If there is not encoding flags in the context, we try the HTTP request. if encFlags := httpreq.ExtractEncodingFlagsFromCtx(ctx); encFlags == nil { encFlags = httpreq.ExtractEncodingFlagsFromProto(r) @@ -524,13 +532,19 @@ func (Codec) EncodeHTTPGrpcResponse(_ context.Context, req *httpgrpc.HTTPRequest return nil, err } - return &httpgrpc.HTTPResponse{ + httpRes := &httpgrpc.HTTPResponse{ Code: int32(http.StatusOK), Body: buf.Bytes(), Headers: []*httpgrpc.Header{ {Key: "Content-Type", Values: []string{"application/json; charset=UTF-8"}}, }, - }, nil + } + + for _, h := range res.GetHeaders() { + httpRes.Headers = append(httpRes.Headers, &httpgrpc.Header{Key: h.Name, Values: h.Values}) + } + + return httpRes, nil } func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*http.Request, error) { @@ -1593,7 +1607,7 @@ func NewEmptyResponse(r queryrangebase.Request) (queryrangebase.Response, error) }, }, nil case *logproto.IndexStatsRequest: - return &logproto.IndexStatsResponse{}, nil + return &IndexStatsResponse{}, nil case *logproto.VolumeRequest: return &VolumeResponse{}, nil default: diff --git a/pkg/querier/queryrange/codec_test.go b/pkg/querier/queryrange/codec_test.go index 70814ff83d091..880143976439e 100644 --- a/pkg/querier/queryrange/codec_test.go +++ b/pkg/querier/queryrange/codec_test.go @@ -1421,6 +1421,9 @@ func (badResponse) Reset() {} func (badResponse) String() string { return "noop" } func (badResponse) ProtoMessage() {} func (badResponse) GetHeaders() []*queryrangebase.PrometheusResponseHeader { return nil } +func (b badResponse) WithHeaders([]queryrangebase.PrometheusResponseHeader) queryrangebase.Response { + return b +} type badReader struct{} diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index a3b14efdbe588..55f880af3f15d 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -273,7 +273,7 @@ func Test_MaxQueryLookBack_Types(t *testing.T) { From: model.Time(now.UnixMilli()), Through: model.Time(now.Add(-90 * time.Minute).UnixMilli()), }, - expectedResponse: &logproto.IndexStatsResponse{}, + expectedResponse: &IndexStatsResponse{}, }, { request: &logproto.VolumeRequest{ diff --git a/pkg/querier/queryrange/queryrangebase/definitions/interface.go b/pkg/querier/queryrange/queryrangebase/definitions/interface.go index fb385817b5ba6..f5259bb085c36 100644 --- a/pkg/querier/queryrange/queryrangebase/definitions/interface.go +++ b/pkg/querier/queryrange/queryrangebase/definitions/interface.go @@ -56,4 +56,5 @@ type Response interface { proto.Message // GetHeaders returns the HTTP headers in the response. GetHeaders() []*PrometheusResponseHeader + WithHeaders([]PrometheusResponseHeader) Response } diff --git a/pkg/querier/queryrange/queryrangebase/middleware.go b/pkg/querier/queryrange/queryrangebase/middleware.go index 4e60ce2dec490..6d72086da0694 100644 --- a/pkg/querier/queryrange/queryrangebase/middleware.go +++ b/pkg/querier/queryrange/queryrangebase/middleware.go @@ -1,10 +1,13 @@ package queryrangebase import ( + "context" "net/http" "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/tenant" + + "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions" ) const ( @@ -28,3 +31,25 @@ func CacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader CacheGenNumberLo }) }) } + +func CacheGenNumberContextSetterMiddleware(cacheGenNumbersLoader CacheGenNumberLoader) Middleware { + return MiddlewareFunc(func(next Handler) Handler { + return HandlerFunc(func(ctx context.Context, req Request) (Response, error) { + userIDs, err := tenant.TenantIDs(ctx) + if err != nil { + return nil, err + } + + cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(userIDs) + + res, err := next.Do(ctx, req) + if err != nil { + return nil, err + } + header := definitions.PrometheusResponseHeader{ + Name: ResultsCacheGenNumberHeaderName, + Values: []string{cacheGenNumber}} + return res.WithHeaders([]definitions.PrometheusResponseHeader{header}), nil + }) + }) +} diff --git a/pkg/querier/queryrange/queryrangebase/query_range.go b/pkg/querier/queryrange/queryrangebase/query_range.go index 4c7426b1714b1..a2653eb8967be 100644 --- a/pkg/querier/queryrange/queryrangebase/query_range.go +++ b/pkg/querier/queryrange/queryrangebase/query_range.go @@ -88,6 +88,24 @@ func (resp *PrometheusResponse) minTime() int64 { return result[0].Samples[0].TimestampMs } +func convertPrometheusResponseHeadersToPointers(h []PrometheusResponseHeader) []*PrometheusResponseHeader { + if h == nil { + return nil + } + + resp := make([]*PrometheusResponseHeader, len(h)) + for i := range h { + resp[i] = &h[i] + } + + return resp +} + +func (resp *PrometheusResponse) WithHeaders(h []PrometheusResponseHeader) Response { + resp.Headers = convertPrometheusResponseHeadersToPointers(h) + return resp +} + // NewEmptyPrometheusResponse returns an empty successful Prometheus query range response. func NewEmptyPrometheusResponse() *PrometheusResponse { return &PrometheusResponse{ diff --git a/pkg/querier/queryrange/views.go b/pkg/querier/queryrange/views.go index 3f3f39907df5c..9751ce588d6e1 100644 --- a/pkg/querier/queryrange/views.go +++ b/pkg/querier/queryrange/views.go @@ -71,6 +71,11 @@ func (v *LokiSeriesResponseView) GetHeaders() []*queryrangebase.PrometheusRespon return v.headers } +func (v *LokiSeriesResponseView) WithHeaders(h []queryrangebase.PrometheusResponseHeader) queryrangebase.Response { + v.headers = convertPrometheusResponseHeadersToPointers(h) + return v +} + // Implement proto.Message func (v *LokiSeriesResponseView) Reset() {} func (v *LokiSeriesResponseView) String() string { return "" } @@ -240,6 +245,10 @@ func (v *MergedSeriesResponseView) GetHeaders() []*queryrangebase.PrometheusResp return v.headers } +func (v *MergedSeriesResponseView) WithHeaders([]queryrangebase.PrometheusResponseHeader) queryrangebase.Response { + return v +} + // Implement proto.Message func (v *MergedSeriesResponseView) Reset() {} func (v *MergedSeriesResponseView) String() string { return "" }