Skip to content

Commit

Permalink
fix(blooms): Extract only line filters before line format expressions (
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Mar 25, 2024
1 parent 51c54ad commit e4a5733
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 6 deletions.
2 changes: 1 addition & 1 deletion pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return nil, errors.New("from time must not be after through time")
}

filters := syntax.ExtractLineFilters(req.Plan.AST)
filters := v1.ExtractTestableLineFilters(req.Plan.AST)
g.metrics.receivedFilters.Observe(float64(len(filters)))

// Shortcut if request does not contain filters
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/plan"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/util/constants"
)

Expand Down Expand Up @@ -73,7 +73,7 @@ func convertToShortRef(ref *logproto.ChunkRef) *logproto.ShortRef {

func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from, through model.Time, chunkRefs []*logproto.ChunkRef, queryPlan plan.QueryPlan) ([]*logproto.ChunkRef, error) {
// Shortcut that does not require any filtering
if len(chunkRefs) == 0 || len(syntax.ExtractLineFilters(queryPlan.AST)) == 0 {
if len(chunkRefs) == 0 || len(v1.ExtractTestableLineFilters(queryPlan.AST)) == 0 {
return chunkRefs, nil
}

Expand Down
33 changes: 33 additions & 0 deletions pkg/storage/bloom/v1/bloom_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,39 @@ func (b BloomTests) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefi
return true
}

// ExtractTestableLineFilters extracts all line filters from an expression
// that can be tested against a bloom filter. This will skip any line filters
// after a line format expression. A line format expression might add content
// that the query later matches against, which can't be tested with a bloom filter.
// E.g. For {app="fake"} |= "foo" | line_format "thisNewTextShouldMatch" |= "thisNewTextShouldMatch"
// this function will return only the line filter for "foo" since the line filter for "thisNewTextShouldMatch"
// wouldn't match against the bloom filter but should match against the query.
func ExtractTestableLineFilters(expr syntax.Expr) []syntax.LineFilterExpr {
if expr == nil {
return nil
}

var filters []syntax.LineFilterExpr
var lineFmtFound bool
visitor := &syntax.DepthFirstTraversal{
VisitLineFilterFn: func(v syntax.RootVisitor, e *syntax.LineFilterExpr) {
if e != nil && !lineFmtFound {
filters = append(filters, *e)
}
},
VisitLineFmtFn: func(v syntax.RootVisitor, e *syntax.LineFmtExpr) {
if e != nil {
lineFmtFound = true
}
},
}
expr.Accept(visitor)
return filters
}

// FiltersToBloomTest converts a list of line filters to a BloomTest.
// Note that all the line filters should be testable against a bloom filter.
// Use ExtractTestableLineFilters to extract testable line filters from an expression.
// TODO(owen-d): limits the number of bloom lookups run.
// An arbitrarily high number can overconsume cpu and is a DoS vector.
func FiltersToBloomTest(b NGramBuilder, filters ...syntax.LineFilterExpr) BloomTest {
Expand Down
9 changes: 7 additions & 2 deletions pkg/storage/bloom/v1/bloom_tester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,19 @@ func TestFiltersToBloomTests(t *testing.T) {
bloom: fakeBloom{"foo", "bar", "baz", "fuzz", "noz"},
expectMatch: false,
},
{
name: "line filter after line format",
query: `{app="fake"} |= "foo" | line_format "thisNewTextShouldMatch" |= "thisNewTextShouldMatch"`,
bloom: fakeBloom{"foo"},
expectMatch: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
expr, err := syntax.ParseExpr(tc.query)
assert.NoError(t, err)
filters := syntax.ExtractLineFilters(expr)
filters := ExtractTestableLineFilters(expr)

bloomTests := FiltersToBloomTest(fakeNgramBuilder{}, filters...)

assert.Equal(t, tc.expectMatch, bloomTests.Matches(tc.bloom))
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/querier/plan"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores"
Expand Down Expand Up @@ -229,7 +230,7 @@ func (g *Gateway) GetChunkRef(ctx context.Context, req *logproto.GetChunkRefRequ

// Extract LineFiltersExpr from the plan. If there is none, we can short-circuit and return before making a req
// to the bloom-gateway (through the g.bloomQuerier)
if len(syntax.ExtractLineFilters(req.Plan.AST)) == 0 {
if len(v1.ExtractTestableLineFilters(req.Plan.AST)) == 0 {
return result, nil
}

Expand Down

0 comments on commit e4a5733

Please sign in to comment.