Skip to content

Commit

Permalink
More lint issues, move scopes resolution to querier
Browse files Browse the repository at this point in the history
Signed-off-by: Ruben Vargas <[email protected]>
  • Loading branch information
rubenvp8510 committed Nov 23, 2023
1 parent d2192f6 commit 9712d4d
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 49 deletions.
4 changes: 2 additions & 2 deletions modules/frontend/searchsharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ func (m *mockReader) SearchForValuesTags(context.Context, *backend.BlockMeta, st
return nil, nil
}

func (m *mockReader) SearchForTagsV2(ctx context.Context, meta *backend.BlockMeta, scope string, opts common.SearchOptions) (*tempopb.SearchTagsV2Response, error) {
func (m *mockReader) SearchForTagsV2(context.Context, *backend.BlockMeta, []string, common.SearchOptions) (*tempopb.SearchTagsV2Response, error) {
return nil, nil
}

func (m *mockReader) SearchForValuesTagsV2(ctx context.Context, meta *backend.BlockMeta, req *tempopb.SearchTagValuesRequest, opts common.SearchOptions) (*tempopb.SearchTagValuesV2Response, error) {
func (m *mockReader) SearchForValuesTagsV2(context.Context, *backend.BlockMeta, *tempopb.SearchTagValuesRequest, common.SearchOptions) (*tempopb.SearchTagValuesV2Response, error) {
return nil, nil
}

Expand Down
39 changes: 13 additions & 26 deletions modules/frontend/tagsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ func (s searchTagSharder) RoundTrip(r *http.Request) (*http.Response, error) {

// execute requests
wg := boundedwaitgroup.New(uint(s.cfg.ConcurrentRequests))
//progress := s.progress(ctx, int(searchReq.Limit))
taghandler := s.tagShardHandlerFactoy(ctx, int(searchReq.Limit))
taghandler := s.tagShardHandlerFactory(ctx, int(searchReq.Limit))

progress := taghandler.progress()
for req := range reqCh {
Expand Down Expand Up @@ -282,39 +281,27 @@ func (s *searchTagSharder) maxDuration(tenantID string) time.Duration {
return s.cfg.MaxDuration
}

type searchTagProgressFactory func(ctx context.Context, limit int) shardedTagsSearchProgress

type SearchTagSharderConfig struct {
ConcurrentRequests int `yaml:"concurrent_jobs,omitempty"`
TargetBytesPerRequest int `yaml:"target_bytes_per_job,omitempty"`
DefaultLimit uint32 `yaml:"default_result_limit"`
MaxLimit uint32 `yaml:"max_result_limit"`
MaxDuration time.Duration `yaml:"max_duration"`
QueryBackendAfter time.Duration `yaml:"query_backend_after,omitempty"`
QueryIngestersUntil time.Duration `yaml:"query_ingesters_until,omitempty"`
}

// newSearchSharder creates a sharding middleware for search
func newTagsSharding(reader tempodb.Reader, o overrides.Interface,
cfg SearchSharderConfig, tagShardHandler TagShardHandlerFactory, logger log.Logger,
) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
return searchTagSharder{
next: next,
reader: reader,
overrides: o,
cfg: cfg,
logger: logger,
tagShardHandlerFactoy: tagShardHandler,
next: next,
reader: reader,
overrides: o,
cfg: cfg,
logger: logger,
tagShardHandlerFactory: tagShardHandler,
}
})
}

type searchTagSharder struct {
next http.RoundTripper
reader tempodb.Reader
overrides overrides.Interface
tagShardHandlerFactoy TagShardHandlerFactory
cfg SearchSharderConfig
logger log.Logger
next http.RoundTripper
reader tempodb.Reader
overrides overrides.Interface
tagShardHandlerFactory TagShardHandlerFactory
cfg SearchSharderConfig
logger log.Logger
}
1 change: 0 additions & 1 deletion modules/frontend/tagsharging_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func (t *TagNameHandler) marshalResponseForFrontendQuerier() (string, error) {
bodyString, err := m.MarshalToString(&tempopb.SearchTagsResponse{
TagNames: t.shardedProgress.result().response.([]string),
})

if err != nil {
return "", err
}
Expand Down
14 changes: 12 additions & 2 deletions modules/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,18 @@ func (q *Querier) internalTagSearchBlockV2(ctx context.Context, req *tempopb.Sea
opts.TotalPages = int(req.PagesToSearch)
opts.MaxBytes = q.limits.MaxBytesPerTrace(tenantID)

tags, err := q.store.SearchForTagsV2(ctx, meta, req.SearchReq.Scope, opts)
scopes := []string{req.SearchReq.Scope}
if req.SearchReq.Scope == "" {
// start with intrinsic scope and all traceql attribute scopes
atts := traceql.AllAttributeScopes()
scopes = make([]string, 0, len(atts)+1) // +1 for intrinsic
scopes = append(scopes, api.ParamScopeIntrinsic)
for _, att := range atts {
scopes = append(scopes, att.String())
}
}

tags, err := q.store.SearchForTagsV2(ctx, meta, scopes, opts)
return tags, err
}

Expand Down Expand Up @@ -984,7 +995,6 @@ func (q *Querier) internalTagValueSearchBlockV2(ctx context.Context, req *tempop
opts.MaxBytes = q.limits.MaxBytesPerTrace(tenantID)

return q.store.SearchForValuesTagsV2(ctx, meta, req.SearchReq, opts)

}

func (q *Querier) postProcessIngesterSearchResults(req *tempopb.SearchRequest, rr []responseFromIngesters) *tempopb.SearchResponse {
Expand Down
29 changes: 11 additions & 18 deletions tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type Reader interface {
EnablePolling(ctx context.Context, sharder blocklist.JobSharder)
SearchForTags(ctx context.Context, meta *backend.BlockMeta, scope string, opts common.SearchOptions) (*tempopb.SearchTagsResponse, error)
SearchForValuesTags(ctx context.Context, meta *backend.BlockMeta, tag string, opts common.SearchOptions) ([]string, error)
SearchForTagsV2(ctx context.Context, meta *backend.BlockMeta, scope string, opts common.SearchOptions) (*tempopb.SearchTagsV2Response, error)
SearchForTagsV2(ctx context.Context, meta *backend.BlockMeta, scopes []string, opts common.SearchOptions) (*tempopb.SearchTagsV2Response, error)
SearchForValuesTagsV2(ctx context.Context, meta *backend.BlockMeta, req *tempopb.SearchTagValuesRequest, opts common.SearchOptions) (*tempopb.SearchTagValuesV2Response, error)
Shutdown()
}
Expand Down Expand Up @@ -398,18 +398,7 @@ func (rw *readerWriter) SearchForValuesTags(ctx context.Context, meta *backend.B
return tags, err
}

func (rw *readerWriter) SearchForTagsV2(ctx context.Context, meta *backend.BlockMeta, scope string, opts common.SearchOptions) (*tempopb.SearchTagsV2Response, error) {

scopes := []string{scope}
if scope == "" {
// start with intrinsic scope and all traceql attribute scopes
atts := traceql.AllAttributeScopes()
scopes = make([]string, 0, len(atts)+1) // +1 for intrinsic
//scopes = append(scopes, api.ParamScopeIntrinsic)
for _, att := range atts {
scopes = append(scopes, att.String())
}
}
func (rw *readerWriter) SearchForTagsV2(ctx context.Context, meta *backend.BlockMeta, scopes []string, opts common.SearchOptions) (*tempopb.SearchTagsV2Response, error) {
resps := make([]*tempopb.SearchTagsResponse, len(scopes))

overallError := atomic.NewError(nil)
Expand Down Expand Up @@ -448,8 +437,11 @@ func (rw *readerWriter) SearchForTagsV2(ctx context.Context, meta *backend.Block
}

func (rw *readerWriter) SearchForValuesTagsV2(ctx context.Context, meta *backend.BlockMeta, req *tempopb.SearchTagValuesRequest, opts common.SearchOptions) (*tempopb.SearchTagValuesV2Response, error) {

block, err := encoding.OpenBlock(meta, rw.r)
if err != nil {
return nil, err
}

var tagValues []*tempopb.TagValue

cb := func(v traceql.Static) bool {
Expand Down Expand Up @@ -493,7 +485,7 @@ func (rw *readerWriter) SearchForValuesTagsV2(ctx context.Context, meta *backend
}

query := extractMatchers(req.Query)
//TODO: check for autocomplete i.autocompleteFilteringEnabled
// TODO: check for autocomplete i.autocompleteFilteringEnabled
if len(query) > 0 {
fetcher := traceql.NewSpansetFetcherWrapper(func(ctx context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) {
return block.Fetch(ctx, req, opts)
Expand All @@ -512,12 +504,13 @@ func (rw *readerWriter) SearchForValuesTagsV2(ctx context.Context, meta *backend
}

return resp, nil

}

// TODO: need to see where can I put this..
var singleFilterRegexp = regexp.MustCompile(`^{[a-zA-Z._\s\-()/&=<>~!0-9"]*}$`)
var matchersRegexp = regexp.MustCompile(`[a-zA-Z._]+\s*[=|<=|>=|=~|!=|>|<|!~]\s*(?:"[a-zA-Z./_0-9-]+"|[0-9smh]+|true|false)`)
var (
singleFilterRegexp = regexp.MustCompile(`^{[a-zA-Z._\s\-()/&=<>~!0-9"]*}$`)
matchersRegexp = regexp.MustCompile(`[a-zA-Z._]+\s*[=|<=|>=|=~|!=|>|<|!~]\s*(?:"[a-zA-Z./_0-9-]+"|[0-9smh]+|true|false)`)
)

// extractMatchers extracts matchers from a query string and returns a string that can be parsed by the storage layer.
func extractMatchers(query string) string {
Expand Down

0 comments on commit 9712d4d

Please sign in to comment.