diff --git a/modules/frontend/tagsharding.go b/modules/frontend/tagsharding.go index d85b9aaf6b2..216b92253d0 100644 --- a/modules/frontend/tagsharding.go +++ b/modules/frontend/tagsharding.go @@ -76,7 +76,7 @@ func (s searchTagSharder) RoundTrip(r *http.Request) (*http.Response, error) { // execute requests wg := boundedwaitgroup.New(uint(s.cfg.ConcurrentRequests)) - taghandler := s.tagShardHandlerFactory(ctx, 0) + taghandler := s.tagShardHandlerFactory(ctx, s.overrides.MaxBytesPerTagValuesQuery(tenantID)) progress := taghandler.progress() for req := range reqCh { @@ -120,6 +120,7 @@ func (s searchTagSharder) RoundTrip(r *http.Request) (*http.Response, error) { } err = taghandler.addResponseToResult(resp.Body) + if err != nil { _ = level.Error(s.logger).Log("msg", "error reading response body status == ok", "url", innerR.RequestURI, "err", err) return @@ -208,10 +209,61 @@ func (s *searchTagSharder) backendRequests( targetBytesPerRequest := s.cfg.TargetBytesPerRequest go func() { - buildBackendRequests(ctx, tenantID, parent, blocks, targetBytesPerRequest, reqCh, stopCh) + buildTagsBackendRequests(ctx, tenantID, parent, blocks, targetBytesPerRequest, reqCh, stopCh) }() } +// buildBackendRequests returns a slice of requests that cover all blocks in the store +// that are covered by start/end. +func buildTagsBackendRequests(ctx context.Context, tenantID string, parent *http.Request, metas []*backend.BlockMeta, bytesPerRequest int, reqCh chan<- *backendReqMsg, stopCh <-chan struct{}) { + defer close(reqCh) + + for _, m := range metas { + pages := pagesPerRequest(m, bytesPerRequest) + if pages == 0 { + continue + } + + blockID := m.BlockID.String() + for startPage := 0; startPage < int(m.TotalRecords); startPage += pages { + subR := parent.Clone(ctx) + subR.Header.Set(user.OrgIDHeaderName, tenantID) + + dc, err := m.DedicatedColumns.ToTempopb() + if err != nil { + reqCh <- &backendReqMsg{err: err} + return + } + + subR, err = api.BuildTagSearchBlockRequest(subR, &tempopb.SearchTagsBlockRequest{ + BlockID: blockID, + StartPage: uint32(startPage), + PagesToSearch: uint32(pages), + Encoding: m.Encoding.String(), + IndexPageSize: m.IndexPageSize, + TotalRecords: m.TotalRecords, + DataEncoding: m.DataEncoding, + Version: m.Version, + Size_: m.Size, + FooterSize: m.FooterSize, + DedicatedColumns: dc, + }) + if err != nil { + reqCh <- &backendReqMsg{err: err} + return + } + + subR.RequestURI = buildUpstreamRequestURI(parent.URL.Path, subR.URL.Query()) + + select { + case reqCh <- &backendReqMsg{req: subR}: + case <-stopCh: + return + } + } + } +} + // ingesterRequest returns a new start and end time range for the backend as well as an http request // that covers the ingesters. If nil is returned for the http.Request then there is no ingesters query. // since this function modifies searchReq.Start and End we are taking a value instead of a pointer to prevent it from diff --git a/pkg/api/http.go b/pkg/api/http.go index d1c87967bd1..d0cc0b92852 100644 --- a/pkg/api/http.go +++ b/pkg/api/http.go @@ -710,6 +710,35 @@ func BuildSearchTagRequest(req *http.Request, searchReq *tempopb.SearchTagsReque return req, nil } +func BuildTagSearchBlockRequest(req *http.Request, searchReq *tempopb.SearchTagsBlockRequest) (*http.Request, error) { + if req == nil { + req = &http.Request{ + URL: &url.URL{}, + } + } + + req, err := BuildSearchTagRequest(req, searchReq.SearchReq) + if err != nil { + return nil, err + } + + q := req.URL.Query() + q.Set(urlParamSize, strconv.FormatUint(searchReq.Size_, 10)) + q.Set(urlParamBlockID, searchReq.BlockID) + q.Set(urlParamStartPage, strconv.FormatUint(uint64(searchReq.StartPage), 10)) + q.Set(urlParamPagesToSearch, strconv.FormatUint(uint64(searchReq.PagesToSearch), 10)) + q.Set(urlParamEncoding, searchReq.Encoding) + q.Set(urlParamIndexPageSize, strconv.FormatUint(uint64(searchReq.IndexPageSize), 10)) + q.Set(urlParamTotalRecords, strconv.FormatUint(uint64(searchReq.TotalRecords), 10)) + q.Set(urlParamDataEncoding, searchReq.DataEncoding) + q.Set(urlParamVersion, searchReq.Version) + q.Set(urlParamFooterSize, strconv.FormatUint(uint64(searchReq.FooterSize), 10)) + + req.URL.RawQuery = q.Encode() + + return req, nil +} + // BuildSearchBlockRequest takes a tempopb.SearchBlockRequest and populates the passed http.Request // with the appropriate params. If no http.Request is provided a new one is created. func BuildSearchBlockRequest(req *http.Request, searchReq *tempopb.SearchBlockRequest) (*http.Request, error) {