diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index d5950c151..d3b059ae5 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -19,6 +19,7 @@ package storage import ( "context" + "maps" "path" "github.com/pkg/errors" @@ -27,7 +28,6 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/inverted" - "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query" @@ -38,7 +38,7 @@ func (s *segment[T, O]) IndexDB() IndexDB { } func (s *segment[T, O]) Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, error) { - sl, _, err := s.index.searchPrimary(ctx, series, nil) + sl, _, err := s.index.filter(ctx, series, nil, nil) return sl, err } @@ -70,7 +70,9 @@ func (s *seriesIndex) Write(docs index.Documents) error { var rangeOpts = index.RangeOpts{} -func (s *seriesIndex) searchPrimary(ctx context.Context, series []*pbv1.Series, projection []index.FieldKey) (sl pbv1.SeriesList, fields FieldResultList, err error) { +func (s *seriesIndex) filter(ctx context.Context, series []*pbv1.Series, + projection []index.FieldKey, secondaryQuery index.Query, +) (sl pbv1.SeriesList, fields FieldResultList, err error) { seriesMatchers := make([]index.SeriesMatcher, len(series)) for i := range series { seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i]) @@ -78,10 +80,14 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, series []*pbv1.Series, return nil, nil, err } } + indexQuery, err := s.store.BuildQuery(seriesMatchers, secondaryQuery) + if err != nil { + return nil, nil, err + } tracer := query.GetTracer(ctx) if tracer != nil { - span, _ := tracer.StartSpan(ctx, "seriesIndex.searchPrimary") - span.Tagf("matchers", "%v", seriesMatchers) + span, _ := tracer.StartSpan(ctx, "seriesIndex.search") + span.Tagf("query", "%s", indexQuery.String()) defer func() { span.Tagf("matched", "%d", len(sl)) if len(fields) > 0 { @@ -93,7 +99,7 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, series []*pbv1.Series, span.Stop() }() } - ss, err := s.store.Search(ctx, seriesMatchers, projection) + ss, err := s.store.Search(ctx, projection, indexQuery) if err != nil { return nil, nil, err } @@ -191,44 +197,19 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In span.Stop() }() } - seriesList, fieldResultList, err := s.searchPrimary(ctx, series, opts.Projection) - if err != nil { - return nil, nil, err - } - pl := seriesList.ToList() - if opts.Query != nil { - var plFilter posting.List - func() { - if tracer != nil { - span, _ := tracer.StartSpan(ctx, "filter") - span.Tag("exp", opts.Query.String()) - defer func() { - if err != nil { - span.Error(err) - } else { - span.Tagf("matched", "%d", plFilter.Len()) - span.Tagf("total", "%d", pl.Len()) - } - span.Stop() - }() - } - if plFilter, err = s.store.Execute(ctx, opts.Query); err != nil { - return - } - if plFilter == nil { - return - } - err = pl.Intersect(plFilter) - }() + if opts.Order == nil || opts.Order.Index == nil { + var seriesList pbv1.SeriesList + var fieldResultList FieldResultList + if opts.Query != nil { + seriesList, fieldResultList, err = s.filter(ctx, series, opts.Projection, opts.Query) + } else { + seriesList, fieldResultList, err = s.filter(ctx, series, opts.Projection, nil) + } if err != nil { return nil, nil, err } - } - - if opts.Order == nil || opts.Order.Index == nil { - sl, frl = filterSeriesList(seriesList, fieldResultList, pl) - return sl, frl, nil + return seriesList, fieldResultList, nil } fieldKey := index.FieldKey{ @@ -245,8 +226,19 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In span.Stop() }() } + seriesMatchers := make([]index.SeriesMatcher, len(series)) + for i := range series { + seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i]) + if err != nil { + return nil, nil, err + } + } + query, err := s.store.BuildQuery(seriesMatchers, opts.Query) + if err != nil { + return nil, nil, err + } iter, err := s.store.Iterator(fieldKey, rangeOpts, - opts.Order.Sort, opts.PreloadSize) + opts.Order.Sort, opts.PreloadSize, query, opts.Projection) if err != nil { return nil, nil, err } @@ -254,56 +246,29 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In err = multierr.Append(err, iter.Close()) }() - var sortedSeriesList pbv1.SeriesList - var sortedFieldResultList FieldResultList var r int + result := make([]index.SeriesDocument, 0, 10) for iter.Next() { r++ - docID := iter.Val().DocID - if !pl.Contains(docID) { - continue - } - sortedSeriesList, sortedFieldResultList = appendSeriesList( - sortedSeriesList, seriesList, - sortedFieldResultList, fieldResultList, - common.SeriesID(docID)) - if err != nil { - return nil, nil, err - } + val := iter.Val() + var doc index.SeriesDocument + doc.Fields = maps.Clone(val.Values) + doc.Key.ID = common.SeriesID(val.DocID) + doc.Key.EntityValues = val.EntityValues + result = append(result, doc) + } + sortedSeriesList, sortedFieldResultList, err := convertIndexSeriesToSeriesList(result, len(opts.Projection) > 0) + if err != nil { + return nil, nil, errors.WithMessagef(err, "failed to convert index series to series list, matchers: %v, matched: %d", seriesMatchers, len(result)) } if span != nil { + span.Tagf("query", "%s", iter.Query().String()) span.Tagf("rounds", "%d", r) span.Tagf("size", "%d", len(sortedSeriesList)) } return sortedSeriesList, sortedFieldResultList, err } -func filterSeriesList(seriesList pbv1.SeriesList, fieldResultList FieldResultList, filter posting.List) (pbv1.SeriesList, FieldResultList) { - for i := 0; i < len(seriesList); i++ { - if !filter.Contains(uint64(seriesList[i].ID)) { - seriesList = append(seriesList[:i], seriesList[i+1:]...) - if fieldResultList != nil { - fieldResultList = append(fieldResultList[:i], fieldResultList[i+1:]...) - } - i-- - } - } - return seriesList, fieldResultList -} - -func appendSeriesList(dest, src pbv1.SeriesList, destFRL, srcFRL FieldResultList, target common.SeriesID) (pbv1.SeriesList, FieldResultList) { - for i := 0; i < len(src); i++ { - if target == src[i].ID { - dest = append(dest, src[i]) - if srcFRL != nil { - destFRL = append(destFRL, srcFRL[i]) - } - break - } - } - return dest, destFRL -} - func (s *seriesIndex) Close() error { return s.store.Close() } diff --git a/banyand/internal/storage/index_test.go b/banyand/internal/storage/index_test.go index d73886a35..2b196ec79 100644 --- a/banyand/internal/storage/index_test.go +++ b/banyand/internal/storage/index_test.go @@ -157,7 +157,7 @@ func TestSeriesIndex_Primary(t *testing.T) { seriesQuery.EntityValues = tt.entityValues[i] seriesQueries = append(seriesQueries, &seriesQuery) } - sl, _, err := si.searchPrimary(ctx, seriesQueries, nil) + sl, _, err := si.filter(ctx, seriesQueries, nil, nil) require.NoError(t, err) require.Equal(t, len(tt.entityValues), len(sl)) assert.Equal(t, tt.subject, sl[0].Subject) diff --git a/banyand/internal/storage/storage.go b/banyand/internal/storage/storage.go index fb7e0af69..47b82639b 100644 --- a/banyand/internal/storage/storage.go +++ b/banyand/internal/storage/storage.go @@ -34,7 +34,6 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/index" - "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query/model" @@ -67,7 +66,7 @@ type SupplyTSDB[T TSTable] func() T // IndexSearchOpts is the options for searching index. type IndexSearchOpts struct { - Query *inverted.Query + Query index.Query Order *model.OrderBy Projection []index.FieldKey PreloadSize int diff --git a/pkg/index/index.go b/pkg/index/index.go index 8234f4965..a7da177ba 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -24,8 +24,6 @@ import ( "fmt" "io" - "github.com/blugelabs/bluge" - "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" @@ -97,11 +95,12 @@ func (r RangeOpts) Between(value []byte) int { // DocumentResult represents a document in an index. type DocumentResult struct { - Values map[string][]byte - SortedValue []byte - SeriesID common.SeriesID - DocID uint64 - Timestamp int64 + EntityValues []byte + Values map[string][]byte + SortedValue []byte + SeriesID common.SeriesID + DocID uint64 + Timestamp int64 } // SortedField returns the value of the sorted field. @@ -114,6 +113,7 @@ type FieldIterator[T sort.Comparable] interface { Next() bool Val() T Close() error + Query() Query } // DummyFieldIterator never iterates. @@ -133,6 +133,10 @@ func (i *dummyIterator) Close() error { return nil } +func (i *dummyIterator) Query() Query { + return nil +} + // Document represents a document in an index. type Document struct { Fields []Field @@ -156,7 +160,8 @@ type Writer interface { // FieldIterable allows building a FieldIterator. type FieldIterable interface { - Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, preLoadSize int) (iter FieldIterator[*DocumentResult], err error) + BuildQuery(seriesMatchers []SeriesMatcher, secondaryQuery Query) (Query, error) + Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, preLoadSize int, query Query, fieldKeys []FieldKey) (iter FieldIterator[*DocumentResult], err error) Sort(sids []common.SeriesID, fieldKey FieldKey, order modelv1.Sort, timeRange *timestamp.TimeRange, preLoadSize int) (FieldIterator[*DocumentResult], error) } @@ -171,9 +176,7 @@ type Searcher interface { // Query is an abstract of an index query. type Query interface { - bluge.Query fmt.Stringer - Query() bluge.Query } // Store is an abstract of an index repository. @@ -204,8 +207,7 @@ type SeriesDocument struct { type SeriesStore interface { Store // Search returns a list of series that match the given matchers. - Search(context.Context, []SeriesMatcher, []FieldKey) ([]SeriesDocument, error) - Execute(context.Context, Query) (posting.List, error) + Search(context.Context, []FieldKey, Query) ([]SeriesDocument, error) } // SeriesMatcherType represents the type of series matcher. diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index 81af6e208..866aacc7c 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -174,8 +174,8 @@ func (s *store) Close() error { return s.writer.Close() } -func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, - order modelv1.Sort, preLoadSize int, +func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, order modelv1.Sort, + preLoadSize int, indexQuery index.Query, fieldKeys []index.FieldKey, ) (iter index.FieldIterator[*index.DocumentResult], err error) { if termRange.Lower != nil && termRange.Upper != nil && @@ -191,7 +191,8 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, return nil, err } fk := fieldKey.Marshal() - query := bluge.NewBooleanQuery() + rangeQuery := bluge.NewBooleanQuery() + rangeNode := newMustNode() addRange := func(query *bluge.BooleanQuery, termRange index.RangeOpts) *bluge.BooleanQuery { if termRange.Upper == nil { termRange.Upper = defaultUpper @@ -206,25 +207,39 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, termRange.IncludesUpper, ). SetField(fk)) + rangeNode.Append(newTermRangeInclusiveNode(string(termRange.Lower), string(termRange.Upper), termRange.IncludesLower, termRange.IncludesUpper, nil)) return query } if fieldKey.HasSeriesID() { - query = query.AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())). + rangeQuery = rangeQuery.AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())). SetField(seriesIDField)) + rangeNode.Append(newTermNode(string(fieldKey.SeriesID.Marshal()), nil)) if termRange.Lower != nil || termRange.Upper != nil { - query = addRange(query, termRange) + rangeQuery = addRange(rangeQuery, termRange) } } else { - query = addRange(query, termRange) + rangeQuery = addRange(rangeQuery, termRange) } sortedKey := fk if order == modelv1.Sort_SORT_DESC { sortedKey = "-" + sortedKey } + query := bluge.NewBooleanQuery().AddMust(rangeQuery) + node := newMustNode() + node.Append(rangeNode) + if indexQuery != nil && indexQuery.(*queryNode).query != nil { + query.AddMust(indexQuery.(*queryNode).query) + node.Append(indexQuery.(*queryNode).node) + } + fields := make([]string, 0, len(fieldKeys)) + for i := range fieldKeys { + fields = append(fields, fieldKeys[i].Marshal()) + } result := &sortIterator{ - query: query, + query: &queryNode{query, node}, + fields: fields, reader: reader, sortedKey: sortedKey, size: preLoadSize, @@ -298,7 +313,7 @@ func (s *store) Match(fieldKey index.FieldKey, matches []string) (posting.List, } func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posting.List, err error) { - iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC, defaultRangePreloadSize) + iter, err := s.Iterator(fieldKey, opts, modelv1.Sort_SORT_ASC, defaultRangePreloadSize, nil, nil) if err != nil { return roaring.DummyPostingList, err } @@ -310,26 +325,6 @@ func (s *store) Range(fieldKey index.FieldKey, opts index.RangeOpts) (list posti return } -func (s *store) Execute(ctx context.Context, query index.Query) (posting.List, error) { - reader, err := s.writer.Reader() - if err != nil { - return nil, err - } - documentMatchIterator, err := reader.Search(ctx, bluge.NewAllMatches(query.Query())) - if err != nil { - return nil, err - } - iter := newBlugeMatchIterator(documentMatchIterator, reader, nil) - defer func() { - err = multierr.Append(err, iter.Close()) - }() - list := roaring.NewPostingList() - for iter.Next() { - list.Insert(iter.Val().DocID) - } - return list, err -} - func (s *store) SizeOnDisk() int64 { _, bytes := s.writer.DirectoryStats() return int64(bytes) @@ -380,6 +375,8 @@ func (bmi *blugeMatchIterator) Next() bool { } err := match.VisitStoredFields(func(field string, value []byte) bool { switch field { + case entityField: + bmi.current.EntityValues = value case docIDField: bmi.current.DocID = convert.BytesToUint64(value) case seriesIDField: diff --git a/pkg/index/inverted/inverted_series.go b/pkg/index/inverted/inverted_series.go index 02852397c..7c3e31f27 100644 --- a/pkg/index/inverted/inverted_series.go +++ b/pkg/index/inverted/inverted_series.go @@ -33,48 +33,71 @@ import ( var emptySeries = make([]index.SeriesDocument, 0) -// Search implements index.SeriesStore. -func (s *store) Search(ctx context.Context, seriesMatchers []index.SeriesMatcher, projection []index.FieldKey) ([]index.SeriesDocument, error) { +// BuildQuery implements index.SeriesStore. +func (s *store) BuildQuery(seriesMatchers []index.SeriesMatcher, secondaryQuery index.Query) (index.Query, error) { if len(seriesMatchers) == 0 { - return emptySeries, nil - } - reader, err := s.writer.Reader() - if err != nil { - return nil, err + return secondaryQuery, nil } - defer func() { - _ = reader.Close() - }() + qs := make([]bluge.Query, len(seriesMatchers)) + primaryNode := newShouldNode() for i := range seriesMatchers { switch seriesMatchers[i].Type { case index.SeriesMatcherTypeExact: - q := bluge.NewTermQuery(convert.BytesToString(seriesMatchers[i].Match)) + match := convert.BytesToString(seriesMatchers[i].Match) + q := bluge.NewTermQuery(match) q.SetField(entityField) qs[i] = q + primaryNode.Append(newTermNode(match, nil)) case index.SeriesMatcherTypePrefix: - q := bluge.NewPrefixQuery(convert.BytesToString(seriesMatchers[i].Match)) + match := convert.BytesToString(seriesMatchers[i].Match) + q := bluge.NewPrefixQuery(match) q.SetField(entityField) qs[i] = q + primaryNode.Append(newPrefixNode(match, nil)) case index.SeriesMatcherTypeWildcard: - q := bluge.NewWildcardQuery(convert.BytesToString(seriesMatchers[i].Match)) + match := convert.BytesToString(seriesMatchers[i].Match) + q := bluge.NewWildcardQuery(match) q.SetField(entityField) qs[i] = q + primaryNode.Append(newWildcardNode(match, nil)) default: return nil, errors.Errorf("unsupported series matcher type: %v", seriesMatchers[i].Type) } } - var query bluge.Query + var primaryQuery bluge.Query if len(qs) > 1 { bq := bluge.NewBooleanQuery() bq.AddShould(qs...) bq.SetMinShould(1) - query = bq + primaryQuery = bq } else { - query = qs[0] + primaryQuery = qs[0] + } + + query := bluge.NewBooleanQuery().AddMust(primaryQuery) + node := newMustNode() + node.Append(primaryNode) + if secondaryQuery != nil && secondaryQuery.(*queryNode).query != nil { + query.AddMust(secondaryQuery.(*queryNode).query) + node.Append(secondaryQuery.(*queryNode).node) } + return &queryNode{query, node}, nil +} + +// Search implements index.SeriesStore. +func (s *store) Search(ctx context.Context, + projection []index.FieldKey, query index.Query, +) ([]index.SeriesDocument, error) { + reader, err := s.writer.Reader() + if err != nil { + return nil, err + } + defer func() { + _ = reader.Close() + }() - dmi, err := reader.Search(ctx, bluge.NewAllMatches(query)) + dmi, err := reader.Search(ctx, bluge.NewAllMatches(query.(*queryNode).query)) if err != nil { return nil, err } diff --git a/pkg/index/inverted/inverted_series_test.go b/pkg/index/inverted/inverted_series_test.go index 03a5e1813..647189d9a 100644 --- a/pkg/index/inverted/inverted_series_test.go +++ b/pkg/index/inverted/inverted_series_test.go @@ -191,7 +191,9 @@ func TestStore_Search(t *testing.T) { name += string(term) + "-" } t.Run(name, func(t *testing.T) { - got, err := s.Search(context.Background(), matchers, tt.projection) + query, err := s.BuildQuery(matchers, nil) + require.NoError(t, err) + got, err := s.Search(context.Background(), tt.projection, query) require.NoError(t, err) assert.Equal(t, tt.want, got) }) @@ -273,12 +275,14 @@ func TestStore_SearchWildcard(t *testing.T) { for _, tt := range tests { t.Run(string(tt.wildcard), func(t *testing.T) { - got, err := s.Search(context.Background(), []index.SeriesMatcher{ + query, err := s.BuildQuery([]index.SeriesMatcher{ { Type: index.SeriesMatcherTypeWildcard, Match: tt.wildcard, }, - }, tt.projection) + }, nil) + require.NoError(t, err) + got, err := s.Search(context.Background(), tt.projection, query) require.NoError(t, err) assert.ElementsMatch(t, tt.want, got) }) @@ -338,12 +342,14 @@ func TestStore_SearchPrefix(t *testing.T) { for _, tt := range tests { t.Run(string(tt.prefix), func(t *testing.T) { - got, err := s.Search(context.Background(), []index.SeriesMatcher{ + query, err := s.BuildQuery([]index.SeriesMatcher{ { Type: index.SeriesMatcherTypePrefix, Match: tt.prefix, }, - }, tt.projection) + }, nil) + require.NoError(t, err) + got, err := s.Search(context.Background(), tt.projection, query) require.NoError(t, err) assert.ElementsMatch(t, tt.want, got) }) diff --git a/pkg/index/inverted/query.go b/pkg/index/inverted/query.go index e3f90fda3..b34fd7195 100644 --- a/pkg/index/inverted/query.go +++ b/pkg/index/inverted/query.go @@ -24,12 +24,12 @@ import ( "strings" "github.com/blugelabs/bluge" - "github.com/blugelabs/bluge/search" "github.com/pkg/errors" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/query/logical" ) @@ -49,30 +49,22 @@ type GlobalIndexError struct { func (g GlobalIndexError) Error() string { return g.IndexRule.String() } -// Query is a wrapper for bluge.Query. -type Query struct { +var _ index.Query = (*queryNode)(nil) + +// queryNode is a wrapper for bluge.Query. +type queryNode struct { query bluge.Query node } -// Searcher implements index.Query. -func (q *Query) Searcher(i search.Reader, options search.SearcherOptions) (search.Searcher, error) { - return q.query.Searcher(i, options) -} - -func (q *Query) String() string { +func (q *queryNode) String() string { return q.node.String() } -// Query implements index.Query. -func (q *Query) Query() bluge.Query { - return q.query -} - // BuildLocalQuery returns blugeQuery for local indices. func BuildLocalQuery(criteria *modelv1.Criteria, schema logical.Schema, entityDict map[string]int, entity []*modelv1.TagValue, -) (*Query, [][]*modelv1.TagValue, bool, error) { +) (index.Query, [][]*modelv1.TagValue, bool, error) { if criteria == nil { return nil, [][]*modelv1.TagValue{entity}, false, nil } @@ -117,7 +109,7 @@ func BuildLocalQuery(criteria *modelv1.Criteria, schema logical.Schema, entityDi return nil, entities, false, nil } if leftIsMatchAllQuery && rightIsMatchAllQuery { - return &Query{ + return &queryNode{ query: bluge.NewMatchAllQuery(), node: newMatchAllNode(), }, entities, true, nil @@ -126,17 +118,17 @@ func BuildLocalQuery(criteria *modelv1.Criteria, schema logical.Schema, entityDi case modelv1.LogicalExpression_LOGICAL_OP_AND: query, node := bluge.NewBooleanQuery(), newMustNode() if left != nil { - query.AddMust(left.query) - node.Append(left.node) + query.AddMust(left.(*queryNode).query) + node.Append(left.(*queryNode).node) } if right != nil { - query.AddMust(right.query) - node.Append(right.node) + query.AddMust(right.(*queryNode).query) + node.Append(right.(*queryNode).node) } - return &Query{query, node}, entities, false, nil + return &queryNode{query, node}, entities, false, nil case modelv1.LogicalExpression_LOGICAL_OP_OR: if leftIsMatchAllQuery || rightIsMatchAllQuery { - return &Query{ + return &queryNode{ query: bluge.NewMatchAllQuery(), node: newMatchAllNode(), }, entities, true, nil @@ -144,14 +136,14 @@ func BuildLocalQuery(criteria *modelv1.Criteria, schema logical.Schema, entityDi query, node := bluge.NewBooleanQuery(), newShouldNode() query.SetMinShould(1) if left != nil { - query.AddShould(left.query) - node.Append(left.node) + query.AddShould(left.(*queryNode).query) + node.Append(left.(*queryNode).node) } if right != nil { - query.AddShould(right.query) - node.Append(right.node) + query.AddShould(right.(*queryNode).query) + node.Append(right.(*queryNode).node) } - return &Query{query, node}, entities, false, nil + return &queryNode{query, node}, entities, false, nil } } return nil, nil, false, logical.ErrInvalidCriteriaType @@ -159,11 +151,11 @@ func BuildLocalQuery(criteria *modelv1.Criteria, schema logical.Schema, entityDi func parseConditionToQuery(cond *modelv1.Condition, indexRule *databasev1.IndexRule, expr logical.LiteralExpr, entity []*modelv1.TagValue, -) (*Query, [][]*modelv1.TagValue, bool, error) { +) (*queryNode, [][]*modelv1.TagValue, bool, error) { field := string(convert.Uint32ToBytes(indexRule.Metadata.Id)) b := expr.Bytes() if len(b) < 1 { - return &Query{ + return &queryNode{ query: bluge.NewMatchAllQuery(), node: newMatchAllNode(), }, [][]*modelv1.TagValue{entity}, true, nil @@ -173,32 +165,32 @@ func parseConditionToQuery(cond *modelv1.Condition, indexRule *databasev1.IndexR case modelv1.Condition_BINARY_OP_GT: query := bluge.NewTermRangeInclusiveQuery(term, maxTerm, false, false).SetField(field) node := newTermRangeInclusiveNode(str, maxInf, false, false, indexRule) - return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, false, nil case modelv1.Condition_BINARY_OP_GE: query := bluge.NewTermRangeInclusiveQuery(term, maxTerm, true, false).SetField(field) node := newTermRangeInclusiveNode(str, maxInf, true, false, indexRule) - return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, false, nil case modelv1.Condition_BINARY_OP_LT: query := bluge.NewTermRangeInclusiveQuery(minTerm, term, false, false).SetField(field) node := newTermRangeInclusiveNode(minInf, str, false, false, indexRule) - return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, false, nil case modelv1.Condition_BINARY_OP_LE: query := bluge.NewTermRangeInclusiveQuery(minTerm, term, false, true).SetField(field) node := newTermRangeInclusiveNode(minInf, str, false, true, indexRule) - return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, false, nil case modelv1.Condition_BINARY_OP_EQ: query := bluge.NewTermQuery(term).SetField(field) node := newTermNode(str, indexRule) - return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, false, nil case modelv1.Condition_BINARY_OP_MATCH: query := bluge.NewMatchQuery(term).SetField(field).SetAnalyzer(Analyzers[indexRule.Analyzer]) node := newMatchNode(str, indexRule) - return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, false, nil case modelv1.Condition_BINARY_OP_NE: query, node := bluge.NewBooleanQuery(), newMustNotNode() query.AddMustNot(bluge.NewTermQuery(term).SetField(field)) node.SetSubNode(newTermNode(str, indexRule)) - return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, false, nil case modelv1.Condition_BINARY_OP_HAVING: bb, elements := expr.Bytes(), expr.Elements() query, node := bluge.NewBooleanQuery(), newMustNode() @@ -208,7 +200,7 @@ func parseConditionToQuery(cond *modelv1.Condition, indexRule *databasev1.IndexR for _, e := range elements { node.Append(newTermNode(e, indexRule)) } - return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, false, nil case modelv1.Condition_BINARY_OP_NOT_HAVING: bb, elements := expr.Bytes(), expr.Elements() subQuery, subNode := bluge.NewBooleanQuery(), newMustNode() @@ -221,7 +213,7 @@ func parseConditionToQuery(cond *modelv1.Condition, indexRule *databasev1.IndexR query, node := bluge.NewBooleanQuery(), newMustNotNode() query.AddMustNot(subQuery) node.SetSubNode(node) - return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, false, nil case modelv1.Condition_BINARY_OP_IN: bb, elements := expr.Bytes(), expr.Elements() query, node := bluge.NewBooleanQuery(), newShouldNode() @@ -232,7 +224,7 @@ func parseConditionToQuery(cond *modelv1.Condition, indexRule *databasev1.IndexR for _, e := range elements { node.Append(newTermNode(e, indexRule)) } - return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, false, nil case modelv1.Condition_BINARY_OP_NOT_IN: bb, elements := expr.Bytes(), expr.Elements() subQuery, subNode := bluge.NewBooleanQuery(), newShouldNode() @@ -246,7 +238,7 @@ func parseConditionToQuery(cond *modelv1.Condition, indexRule *databasev1.IndexR query, node := bluge.NewBooleanQuery(), newMustNotNode() query.AddMustNot(subQuery) node.SetSubNode(subNode) - return &Query{query, node}, [][]*modelv1.TagValue{entity}, false, nil + return &queryNode{query, node}, [][]*modelv1.TagValue{entity}, false, nil } return nil, nil, false, errors.WithMessagef(logical.ErrUnsupportedConditionOp, "index filter parses %v", cond) } @@ -429,3 +421,53 @@ func (m *matchNode) MarshalJSON() ([]byte, error) { func (m *matchNode) String() string { return convert.JSONToString(m) } + +type prefixNode struct { + indexRule *databasev1.IndexRule + prefix string +} + +func newPrefixNode(prefix string, indexRule *databasev1.IndexRule) *prefixNode { + return &prefixNode{ + indexRule: indexRule, + prefix: prefix, + } +} + +func (m *prefixNode) MarshalJSON() ([]byte, error) { + inner := make(map[string]interface{}, 1) + inner["index"] = m.indexRule.Metadata.Name + ":" + m.indexRule.Metadata.Group + inner["value"] = m.prefix + data := make(map[string]interface{}, 1) + data["prefix"] = inner + return json.Marshal(data) +} + +func (m *prefixNode) String() string { + return convert.JSONToString(m) +} + +type wildcardNode struct { + indexRule *databasev1.IndexRule + wildcard string +} + +func newWildcardNode(wildcard string, indexRule *databasev1.IndexRule) *wildcardNode { + return &wildcardNode{ + indexRule: indexRule, + wildcard: wildcard, + } +} + +func (m *wildcardNode) MarshalJSON() ([]byte, error) { + inner := make(map[string]interface{}, 1) + inner["index"] = m.indexRule.Metadata.Name + ":" + m.indexRule.Metadata.Group + inner["value"] = m.wildcard + data := make(map[string]interface{}, 1) + data["wildcard"] = inner + return json.Marshal(data) +} + +func (m *wildcardNode) String() string { + return convert.JSONToString(m) +} diff --git a/pkg/index/inverted/sort.go b/pkg/index/inverted/sort.go index c63b4c42d..730b9e363 100644 --- a/pkg/index/inverted/sort.go +++ b/pkg/index/inverted/sort.go @@ -69,7 +69,7 @@ func (s *store) Sort(sids []common.SeriesID, fieldKey index.FieldKey, order mode sortedKey = "-" + sortedKey } result := &sortIterator{ - query: query, + query: &queryNode{query: query}, reader: reader, sortedKey: sortedKey, size: preLoadSize, @@ -78,12 +78,13 @@ func (s *store) Sort(sids []common.SeriesID, fieldKey index.FieldKey, order mode } type sortIterator struct { - query bluge.Query + query index.Query err error reader *bluge.Reader current *blugeMatchIterator closer *run.Closer sortedKey string + fields []string size int skipped int } @@ -109,7 +110,7 @@ func (si *sortIterator) loadCurrent() bool { // overflow size = math.MaxInt64 } - topNSearch := bluge.NewTopNSearch(size, si.query).SortBy([]string{si.sortedKey}) + topNSearch := bluge.NewTopNSearch(size, si.query.(*queryNode).query).SortBy([]string{si.sortedKey}) if si.skipped > 0 { topNSearch = topNSearch.SetFrom(si.skipped) } @@ -120,7 +121,7 @@ func (si *sortIterator) loadCurrent() bool { return false } - iter := newBlugeMatchIterator(documentMatchIterator, nil, nil) + iter := newBlugeMatchIterator(documentMatchIterator, nil, si.fields) si.current = &iter if si.next() { return true @@ -159,3 +160,7 @@ func (si *sortIterator) Close() error { } return errors.Join(si.err, si.current.Close(), si.reader.Close()) } + +func (si *sortIterator) Query() index.Query { + return si.query +} diff --git a/pkg/index/testcases/duration.go b/pkg/index/testcases/duration.go index 83033e3ec..45e3aef28 100644 --- a/pkg/index/testcases/duration.go +++ b/pkg/index/testcases/duration.go @@ -288,7 +288,7 @@ func RunDuration(t *testing.T, data map[int]posting.List, store SimpleStore) { t.Run(tt.name, func(t *testing.T) { tester := assert.New(t) is := require.New(t) - iter, err := store.Iterator(tt.args.fieldKey, tt.args.termRange, tt.args.orderType, tt.preloadSize) + iter, err := store.Iterator(tt.args.fieldKey, tt.args.termRange, tt.args.orderType, tt.preloadSize, nil, nil) is.NoError(err) if iter == nil { tester.Empty(tt.want) diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go index a833023fd..1d74d375a 100644 --- a/pkg/query/logical/measure/measure_plan_indexscan_local.go +++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go @@ -27,6 +27,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" @@ -114,7 +115,7 @@ var ( ) type localIndexScan struct { - query *inverted.Query + query index.Query schema logical.Schema uis *unresolvedIndexScan order *logical.OrderBy diff --git a/pkg/query/model/model.go b/pkg/query/model/model.go index 20e98659a..71054029a 100644 --- a/pkg/query/model/model.go +++ b/pkg/query/model/model.go @@ -25,7 +25,6 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/index" - "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -73,7 +72,7 @@ const ( // MeasureQueryOptions is the options of a measure query. type MeasureQueryOptions struct { - Query *inverted.Query + Query index.Query TimeRange *timestamp.TimeRange Order *OrderBy Name string