Skip to content

Commit

Permalink
Set max bytes per tag values limits
Browse files Browse the repository at this point in the history
Signed-off-by: Ruben Vargas <[email protected]>
  • Loading branch information
rubenvp8510 committed Nov 24, 2023
1 parent ab88ef3 commit a56eb35
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 2 deletions.
56 changes: 54 additions & 2 deletions modules/frontend/tagsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s searchTagSharder) RoundTrip(r *http.Request) (*http.Response, error) {

// execute requests
wg := boundedwaitgroup.New(uint(s.cfg.ConcurrentRequests))
taghandler := s.tagShardHandlerFactory(ctx, 0)
taghandler := s.tagShardHandlerFactory(ctx, s.overrides.MaxBytesPerTagValuesQuery(tenantID))

progress := taghandler.progress()
for req := range reqCh {
Expand Down Expand Up @@ -120,6 +120,7 @@ func (s searchTagSharder) RoundTrip(r *http.Request) (*http.Response, error) {
}

err = taghandler.addResponseToResult(resp.Body)

if err != nil {
_ = level.Error(s.logger).Log("msg", "error reading response body status == ok", "url", innerR.RequestURI, "err", err)
return
Expand Down Expand Up @@ -208,10 +209,61 @@ func (s *searchTagSharder) backendRequests(
targetBytesPerRequest := s.cfg.TargetBytesPerRequest

go func() {
buildBackendRequests(ctx, tenantID, parent, blocks, targetBytesPerRequest, reqCh, stopCh)
buildTagsBackendRequests(ctx, tenantID, parent, blocks, targetBytesPerRequest, reqCh, stopCh)
}()
}

// buildBackendRequests returns a slice of requests that cover all blocks in the store
// that are covered by start/end.
func buildTagsBackendRequests(ctx context.Context, tenantID string, parent *http.Request, metas []*backend.BlockMeta, bytesPerRequest int, reqCh chan<- *backendReqMsg, stopCh <-chan struct{}) {
defer close(reqCh)

for _, m := range metas {
pages := pagesPerRequest(m, bytesPerRequest)
if pages == 0 {
continue
}

blockID := m.BlockID.String()
for startPage := 0; startPage < int(m.TotalRecords); startPage += pages {
subR := parent.Clone(ctx)
subR.Header.Set(user.OrgIDHeaderName, tenantID)

dc, err := m.DedicatedColumns.ToTempopb()
if err != nil {
reqCh <- &backendReqMsg{err: err}
return
}

subR, err = api.BuildTagSearchBlockRequest(subR, &tempopb.SearchTagsBlockRequest{
BlockID: blockID,
StartPage: uint32(startPage),
PagesToSearch: uint32(pages),
Encoding: m.Encoding.String(),
IndexPageSize: m.IndexPageSize,
TotalRecords: m.TotalRecords,
DataEncoding: m.DataEncoding,
Version: m.Version,
Size_: m.Size,
FooterSize: m.FooterSize,
DedicatedColumns: dc,
})
if err != nil {
reqCh <- &backendReqMsg{err: err}
return
}

subR.RequestURI = buildUpstreamRequestURI(parent.URL.Path, subR.URL.Query())

select {
case reqCh <- &backendReqMsg{req: subR}:
case <-stopCh:
return
}
}
}
}

// ingesterRequest returns a new start and end time range for the backend as well as an http request
// that covers the ingesters. If nil is returned for the http.Request then there is no ingesters query.
// since this function modifies searchReq.Start and End we are taking a value instead of a pointer to prevent it from
Expand Down
29 changes: 29 additions & 0 deletions pkg/api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,35 @@ func BuildSearchTagRequest(req *http.Request, searchReq *tempopb.SearchTagsReque
return req, nil
}

func BuildTagSearchBlockRequest(req *http.Request, searchReq *tempopb.SearchTagsBlockRequest) (*http.Request, error) {
if req == nil {
req = &http.Request{
URL: &url.URL{},
}
}

req, err := BuildSearchTagRequest(req, searchReq.SearchReq)
if err != nil {
return nil, err
}

q := req.URL.Query()
q.Set(urlParamSize, strconv.FormatUint(searchReq.Size_, 10))
q.Set(urlParamBlockID, searchReq.BlockID)
q.Set(urlParamStartPage, strconv.FormatUint(uint64(searchReq.StartPage), 10))
q.Set(urlParamPagesToSearch, strconv.FormatUint(uint64(searchReq.PagesToSearch), 10))
q.Set(urlParamEncoding, searchReq.Encoding)
q.Set(urlParamIndexPageSize, strconv.FormatUint(uint64(searchReq.IndexPageSize), 10))
q.Set(urlParamTotalRecords, strconv.FormatUint(uint64(searchReq.TotalRecords), 10))
q.Set(urlParamDataEncoding, searchReq.DataEncoding)
q.Set(urlParamVersion, searchReq.Version)
q.Set(urlParamFooterSize, strconv.FormatUint(uint64(searchReq.FooterSize), 10))

req.URL.RawQuery = q.Encode()

return req, nil
}

// BuildSearchBlockRequest takes a tempopb.SearchBlockRequest and populates the passed http.Request
// with the appropriate params. If no http.Request is provided a new one is created.
func BuildSearchBlockRequest(req *http.Request, searchReq *tempopb.SearchBlockRequest) (*http.Request, error) {
Expand Down

0 comments on commit a56eb35

Please sign in to comment.