Skip to content

Commit

Permalink
Merge the query process of series index (#505)
Browse files Browse the repository at this point in the history
* Merge queries of primary index and secondary index

---------

Co-authored-by: Gao Hongtao <[email protected]>
  • Loading branch information
ButterBright and hanahmily authored Aug 8, 2024
1 parent a20d7ae commit 4588b55
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 193 deletions.
125 changes: 45 additions & 80 deletions banyand/internal/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package storage

import (
"context"
"maps"
"path"

"github.com/pkg/errors"
Expand All @@ -27,7 +28,6 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query"
Expand All @@ -38,7 +38,7 @@ func (s *segment[T, O]) IndexDB() IndexDB {
}

func (s *segment[T, O]) Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, error) {
sl, _, err := s.index.searchPrimary(ctx, series, nil)
sl, _, err := s.index.filter(ctx, series, nil, nil)
return sl, err
}

Expand Down Expand Up @@ -70,18 +70,24 @@ func (s *seriesIndex) Write(docs index.Documents) error {

var rangeOpts = index.RangeOpts{}

func (s *seriesIndex) searchPrimary(ctx context.Context, series []*pbv1.Series, projection []index.FieldKey) (sl pbv1.SeriesList, fields FieldResultList, err error) {
func (s *seriesIndex) filter(ctx context.Context, series []*pbv1.Series,
projection []index.FieldKey, secondaryQuery index.Query,
) (sl pbv1.SeriesList, fields FieldResultList, err error) {
seriesMatchers := make([]index.SeriesMatcher, len(series))
for i := range series {
seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i])
if err != nil {
return nil, nil, err
}
}
indexQuery, err := s.store.BuildQuery(seriesMatchers, secondaryQuery)
if err != nil {
return nil, nil, err
}
tracer := query.GetTracer(ctx)
if tracer != nil {
span, _ := tracer.StartSpan(ctx, "seriesIndex.searchPrimary")
span.Tagf("matchers", "%v", seriesMatchers)
span, _ := tracer.StartSpan(ctx, "seriesIndex.search")
span.Tagf("query", "%s", indexQuery.String())
defer func() {
span.Tagf("matched", "%d", len(sl))
if len(fields) > 0 {
Expand All @@ -93,7 +99,7 @@ func (s *seriesIndex) searchPrimary(ctx context.Context, series []*pbv1.Series,
span.Stop()
}()
}
ss, err := s.store.Search(ctx, seriesMatchers, projection)
ss, err := s.store.Search(ctx, projection, indexQuery)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -191,44 +197,19 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
span.Stop()
}()
}
seriesList, fieldResultList, err := s.searchPrimary(ctx, series, opts.Projection)
if err != nil {
return nil, nil, err
}

pl := seriesList.ToList()
if opts.Query != nil {
var plFilter posting.List
func() {
if tracer != nil {
span, _ := tracer.StartSpan(ctx, "filter")
span.Tag("exp", opts.Query.String())
defer func() {
if err != nil {
span.Error(err)
} else {
span.Tagf("matched", "%d", plFilter.Len())
span.Tagf("total", "%d", pl.Len())
}
span.Stop()
}()
}
if plFilter, err = s.store.Execute(ctx, opts.Query); err != nil {
return
}
if plFilter == nil {
return
}
err = pl.Intersect(plFilter)
}()
if opts.Order == nil || opts.Order.Index == nil {
var seriesList pbv1.SeriesList
var fieldResultList FieldResultList
if opts.Query != nil {
seriesList, fieldResultList, err = s.filter(ctx, series, opts.Projection, opts.Query)
} else {
seriesList, fieldResultList, err = s.filter(ctx, series, opts.Projection, nil)
}
if err != nil {
return nil, nil, err
}
}

if opts.Order == nil || opts.Order.Index == nil {
sl, frl = filterSeriesList(seriesList, fieldResultList, pl)
return sl, frl, nil
return seriesList, fieldResultList, nil
}

fieldKey := index.FieldKey{
Expand All @@ -245,65 +226,49 @@ func (s *seriesIndex) Search(ctx context.Context, series []*pbv1.Series, opts In
span.Stop()
}()
}
seriesMatchers := make([]index.SeriesMatcher, len(series))
for i := range series {
seriesMatchers[i], err = convertEntityValuesToSeriesMatcher(series[i])
if err != nil {
return nil, nil, err
}
}
query, err := s.store.BuildQuery(seriesMatchers, opts.Query)
if err != nil {
return nil, nil, err
}
iter, err := s.store.Iterator(fieldKey, rangeOpts,
opts.Order.Sort, opts.PreloadSize)
opts.Order.Sort, opts.PreloadSize, query, opts.Projection)
if err != nil {
return nil, nil, err
}
defer func() {
err = multierr.Append(err, iter.Close())
}()

var sortedSeriesList pbv1.SeriesList
var sortedFieldResultList FieldResultList
var r int
result := make([]index.SeriesDocument, 0, 10)
for iter.Next() {
r++
docID := iter.Val().DocID
if !pl.Contains(docID) {
continue
}
sortedSeriesList, sortedFieldResultList = appendSeriesList(
sortedSeriesList, seriesList,
sortedFieldResultList, fieldResultList,
common.SeriesID(docID))
if err != nil {
return nil, nil, err
}
val := iter.Val()
var doc index.SeriesDocument
doc.Fields = maps.Clone(val.Values)
doc.Key.ID = common.SeriesID(val.DocID)
doc.Key.EntityValues = val.EntityValues
result = append(result, doc)
}
sortedSeriesList, sortedFieldResultList, err := convertIndexSeriesToSeriesList(result, len(opts.Projection) > 0)
if err != nil {
return nil, nil, errors.WithMessagef(err, "failed to convert index series to series list, matchers: %v, matched: %d", seriesMatchers, len(result))
}
if span != nil {
span.Tagf("query", "%s", iter.Query().String())
span.Tagf("rounds", "%d", r)
span.Tagf("size", "%d", len(sortedSeriesList))
}
return sortedSeriesList, sortedFieldResultList, err
}

func filterSeriesList(seriesList pbv1.SeriesList, fieldResultList FieldResultList, filter posting.List) (pbv1.SeriesList, FieldResultList) {
for i := 0; i < len(seriesList); i++ {
if !filter.Contains(uint64(seriesList[i].ID)) {
seriesList = append(seriesList[:i], seriesList[i+1:]...)
if fieldResultList != nil {
fieldResultList = append(fieldResultList[:i], fieldResultList[i+1:]...)
}
i--
}
}
return seriesList, fieldResultList
}

func appendSeriesList(dest, src pbv1.SeriesList, destFRL, srcFRL FieldResultList, target common.SeriesID) (pbv1.SeriesList, FieldResultList) {
for i := 0; i < len(src); i++ {
if target == src[i].ID {
dest = append(dest, src[i])
if srcFRL != nil {
destFRL = append(destFRL, srcFRL[i])
}
break
}
}
return dest, destFRL
}

func (s *seriesIndex) Close() error {
return s.store.Close()
}
2 changes: 1 addition & 1 deletion banyand/internal/storage/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
seriesQuery.EntityValues = tt.entityValues[i]
seriesQueries = append(seriesQueries, &seriesQuery)
}
sl, _, err := si.searchPrimary(ctx, seriesQueries, nil)
sl, _, err := si.filter(ctx, seriesQueries, nil, nil)
require.NoError(t, err)
require.Equal(t, len(tt.entityValues), len(sl))
assert.Equal(t, tt.subject, sl[0].Subject)
Expand Down
3 changes: 1 addition & 2 deletions banyand/internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/query/model"
Expand Down Expand Up @@ -67,7 +66,7 @@ type SupplyTSDB[T TSTable] func() T

// IndexSearchOpts is the options for searching index.
type IndexSearchOpts struct {
Query *inverted.Query
Query index.Query
Order *model.OrderBy
Projection []index.FieldKey
PreloadSize int
Expand Down
26 changes: 14 additions & 12 deletions pkg/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"fmt"
"io"

"github.com/blugelabs/bluge"

"github.com/apache/skywalking-banyandb/api/common"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
Expand Down Expand Up @@ -97,11 +95,12 @@ func (r RangeOpts) Between(value []byte) int {

// DocumentResult represents a document in an index.
type DocumentResult struct {
Values map[string][]byte
SortedValue []byte
SeriesID common.SeriesID
DocID uint64
Timestamp int64
EntityValues []byte
Values map[string][]byte
SortedValue []byte
SeriesID common.SeriesID
DocID uint64
Timestamp int64
}

// SortedField returns the value of the sorted field.
Expand All @@ -114,6 +113,7 @@ type FieldIterator[T sort.Comparable] interface {
Next() bool
Val() T
Close() error
Query() Query
}

// DummyFieldIterator never iterates.
Expand All @@ -133,6 +133,10 @@ func (i *dummyIterator) Close() error {
return nil
}

func (i *dummyIterator) Query() Query {
return nil
}

// Document represents a document in an index.
type Document struct {
Fields []Field
Expand All @@ -156,7 +160,8 @@ type Writer interface {

// FieldIterable allows building a FieldIterator.
type FieldIterable interface {
Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, preLoadSize int) (iter FieldIterator[*DocumentResult], err error)
BuildQuery(seriesMatchers []SeriesMatcher, secondaryQuery Query) (Query, error)
Iterator(fieldKey FieldKey, termRange RangeOpts, order modelv1.Sort, preLoadSize int, query Query, fieldKeys []FieldKey) (iter FieldIterator[*DocumentResult], err error)
Sort(sids []common.SeriesID, fieldKey FieldKey, order modelv1.Sort, timeRange *timestamp.TimeRange, preLoadSize int) (FieldIterator[*DocumentResult], error)
}

Expand All @@ -171,9 +176,7 @@ type Searcher interface {

// Query is an abstract of an index query.
type Query interface {
bluge.Query
fmt.Stringer
Query() bluge.Query
}

// Store is an abstract of an index repository.
Expand Down Expand Up @@ -204,8 +207,7 @@ type SeriesDocument struct {
type SeriesStore interface {
Store
// Search returns a list of series that match the given matchers.
Search(context.Context, []SeriesMatcher, []FieldKey) ([]SeriesDocument, error)
Execute(context.Context, Query) (posting.List, error)
Search(context.Context, []FieldKey, Query) ([]SeriesDocument, error)
}

// SeriesMatcherType represents the type of series matcher.
Expand Down
Loading

0 comments on commit 4588b55

Please sign in to comment.