Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Store structured metadata with patterns #12936

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 33 additions & 7 deletions pkg/pattern/drain/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"sort"
"time"

"github.com/grafana/loki/pkg/push"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/pattern/iter"
)

Expand All @@ -21,16 +24,20 @@ const (
type Chunks []Chunk

type Chunk struct {
Samples []logproto.PatternSample
Samples []logproto.PatternSample
StructuredMetadata map[string]string
}

func newChunk(ts model.Time) Chunk {
func newChunk(ts model.Time, structuredMetadata push.LabelsAdapter) Chunk {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not confident I am using the right types to pass the labels through from the request - any recommendations here?

maxSize := int(maxChunkTime.Nanoseconds()/TimeResolution.UnixNano()) + 1
v := Chunk{Samples: make([]logproto.PatternSample, 1, maxSize)}
v := Chunk{Samples: make([]logproto.PatternSample, 1, maxSize), StructuredMetadata: make(map[string]string, 32)}
v.Samples[0] = logproto.PatternSample{
Timestamp: ts,
Value: 1,
}
for _, lbl := range structuredMetadata {
v.StructuredMetadata[lbl.Name] = lbl.Value
}
return v
}

Expand Down Expand Up @@ -94,31 +101,50 @@ func (c Chunk) ForRange(start, end, step model.Time) []logproto.PatternSample {
return aggregatedSamples
}

func (c *Chunks) Add(ts model.Time) {
func (c *Chunks) Add(ts model.Time, metadata push.LabelsAdapter) {
t := truncateTimestamp(ts, TimeResolution)

if len(*c) == 0 {
*c = append(*c, newChunk(t))
*c = append(*c, newChunk(t, metadata))
return
}
last := &(*c)[len(*c)-1]
if last.Samples[len(last.Samples)-1].Timestamp == t {
last.Samples[len(last.Samples)-1].Value++
for _, md := range metadata {
last.StructuredMetadata[md.Name] = md.Value
}
return
}
if !last.spaceFor(t) {
*c = append(*c, newChunk(t))
*c = append(*c, newChunk(t, metadata))
return
}
last.Samples = append(last.Samples, logproto.PatternSample{
Timestamp: t,
Value: 1,
})
for _, md := range metadata {
last.StructuredMetadata[md.Name] = md.Value
}
}

func (c Chunks) Iterator(pattern string, from, through, step model.Time) iter.Iterator {
func (c Chunks) Iterator(pattern string, from, through, step model.Time, labelFilters log.StreamPipeline) iter.Iterator {
iters := make([]iter.Iterator, 0, len(c))
var emptyLine []byte
for _, chunk := range c {
chunkMetadata := make([]labels.Label, 0, len(chunk.StructuredMetadata))
for k, v := range chunk.StructuredMetadata {
chunkMetadata = append(chunkMetadata, labels.Label{
Name: k,
Value: v,
})
}
_, _, matches := labelFilters.Process(0, emptyLine, chunkMetadata...)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a little hacky since I'm passing empty TS and Line, but it enables re-use of the existing LogQL logic to implement the matching so I think its the best option.

if !matches {
continue
}

samples := chunk.ForRange(from, through, step)
if len(samples) == 0 {
continue
Expand Down
19 changes: 11 additions & 8 deletions pkg/pattern/drain/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,33 @@ import (
"testing"
"time"

"github.com/grafana/loki/pkg/push"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logproto"
)

var metadata = push.LabelsAdapter{}

func TestAdd(t *testing.T) {
cks := Chunks{}
cks.Add(TimeResolution + 1)
cks.Add(TimeResolution + 2)
cks.Add(2*TimeResolution + 1)
cks.Add(TimeResolution+1, metadata)
cks.Add(TimeResolution+2, metadata)
cks.Add(2*TimeResolution+1, metadata)
require.Equal(t, 1, len(cks))
require.Equal(t, 2, len(cks[0].Samples))
cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds()) + TimeResolution + 1)
cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds())+TimeResolution+1, metadata)
require.Equal(t, 2, len(cks))
require.Equal(t, 1, len(cks[1].Samples))
}

func TestIterator(t *testing.T) {
cks := Chunks{}
cks.Add(TimeResolution + 1)
cks.Add(TimeResolution + 2)
cks.Add(2*TimeResolution + 1)
cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds()) + TimeResolution + 1)
cks.Add(TimeResolution+1, metadata)
cks.Add(TimeResolution+2, metadata)
cks.Add(2*TimeResolution+1, metadata)
cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds())+TimeResolution+1, metadata)

it := cks.Iterator("test", model.Time(0), model.Time(time.Hour.Nanoseconds()), TimeResolution)
require.NotNil(t, it)
Expand Down
15 changes: 8 additions & 7 deletions pkg/pattern/drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"
"unicode"

"github.com/grafana/loki/pkg/push"
"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/prometheus/common/model"

Expand Down Expand Up @@ -171,15 +172,15 @@ func (d *Drain) Clusters() []*LogCluster {
return d.idToCluster.Values()
}

func (d *Drain) TrainTokens(tokens []string, stringer func([]string) string, ts int64) *LogCluster {
return d.train(tokens, stringer, ts)
func (d *Drain) TrainTokens(tokens []string, stringer func([]string) string, ts int64, metadata push.LabelsAdapter) *LogCluster {
return d.train(tokens, stringer, ts, metadata)
}

func (d *Drain) Train(content string, ts int64) *LogCluster {
return d.train(d.getContentAsTokens(content), nil, ts)
func (d *Drain) Train(content string, ts int64, metadata push.LabelsAdapter) *LogCluster {
return d.train(d.getContentAsTokens(content), nil, ts, metadata)
}

func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64) *LogCluster {
func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64, metadata push.LabelsAdapter) *LogCluster {
matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, false)
// Match no existing log cluster
if matchCluster == nil {
Expand All @@ -192,13 +193,13 @@ func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64)
Stringer: stringer,
Chunks: Chunks{},
}
matchCluster.append(model.TimeFromUnixNano(ts))
matchCluster.append(model.TimeFromUnixNano(ts), metadata)
d.idToCluster.Set(clusterID, matchCluster)
d.addSeqToPrefixTree(d.rootNode, matchCluster)
} else {
newTemplateTokens := d.createTemplate(tokens, matchCluster.Tokens)
matchCluster.Tokens = newTemplateTokens
matchCluster.append(model.TimeFromUnixNano(ts))
matchCluster.append(model.TimeFromUnixNano(ts), metadata)
// Touch cluster to update its state in the cache.
d.idToCluster.Get(matchCluster.id)
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/pattern/drain/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"testing"

"github.com/grafana/loki/pkg/push"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/logql/log/pattern"
Expand Down Expand Up @@ -106,7 +107,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) {
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
tt.drain.Train(line, 0)
tt.drain.Train(line, 0, push.LabelsAdapter{})
}

var output []string
Expand Down Expand Up @@ -152,7 +153,7 @@ func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
for _, line := range tt.inputLines {
tt.drain.Train(line, 0)
tt.drain.Train(line, 0, push.LabelsAdapter{})
}
t.Log("Learned clusters", tt.drain.Clusters())

Expand Down Expand Up @@ -217,7 +218,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T)
tt := tt
t.Run(tt.name, func(t *testing.T) {
for _, line := range tt.inputLines {
tt.drain.Train(line, 0)
tt.drain.Train(line, 0, push.LabelsAdapter{})
}
require.Equal(t, 1, len(tt.drain.Clusters()))
cluster := tt.drain.Clusters()[0]
Expand Down
10 changes: 6 additions & 4 deletions pkg/pattern/drain/log_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"strings"
"time"

"github.com/grafana/loki/pkg/push"
"github.com/prometheus/common/model"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/pattern/iter"
)

Expand All @@ -25,18 +27,18 @@ func (c *LogCluster) String() string {
return strings.Join(c.Tokens, " ")
}

func (c *LogCluster) append(ts model.Time) {
func (c *LogCluster) append(ts model.Time, metadata push.LabelsAdapter) {
c.Size++
c.Chunks.Add(ts)
c.Chunks.Add(ts, metadata)
}

func (c *LogCluster) merge(samples []*logproto.PatternSample) {
c.Size += int(sumSize(samples))
c.Chunks.merge(samples)
}

func (c *LogCluster) Iterator(from, through, step model.Time) iter.Iterator {
return c.Chunks.Iterator(c.String(), from, through, step)
func (c *LogCluster) Iterator(from, through, step model.Time, labelFilters log.StreamPipeline) iter.Iterator {
return c.Chunks.Iterator(c.String(), from, through, step, labelFilters)
}

func (c *LogCluster) Samples() []*logproto.PatternSample {
Expand Down
17 changes: 16 additions & 1 deletion pkg/pattern/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/go-kit/log"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/ring"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/v3/pkg/logproto"
Expand Down Expand Up @@ -44,10 +45,24 @@ func NewIngesterQuerier(
}

func (q *IngesterQuerier) Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) {
_, err := syntax.ParseMatchers(req.Query, true)
expr, err := syntax.ParseLogSelector(req.Query, true)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
var queryErr error
expr.Walk(func(treeExpr syntax.Expr) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put this here to guard against anyone passing in more complex queries to this API. Only allowing a subset of LogQL is a bit of an odd use case so I'd appreciate any input on whether there's a better way to do this.

switch treeExpr.(type) {
case *syntax.MatchersExpr: // Permit
case *syntax.PipelineExpr: // Permit
case *syntax.LabelFilterExpr: // Permit
default:
queryErr = errors.New("only label filters are allowed")
}
})
if queryErr != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.PatternClient) (interface{}, error) {
return client.Query(ctx, req)
})
Expand Down
26 changes: 24 additions & 2 deletions pkg/pattern/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/go-kit/log"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/multierror"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

Expand Down Expand Up @@ -74,10 +75,31 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {

// Iterator returns an iterator of pattern samples matching the given query patterns request.
func (i *instance) Iterator(ctx context.Context, req *logproto.QueryPatternsRequest) (iter.Iterator, error) {
matchers, err := syntax.ParseMatchers(req.Query, true)
expr, err := syntax.ParseLogSelector(req.Query, true)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
var queryErr error
expr.Walk(func(treeExpr syntax.Expr) {
switch treeExpr.(type) {
case *syntax.MatchersExpr: // Permit
case *syntax.PipelineExpr: // Permit
case *syntax.LabelFilterExpr: // Permit
default:
queryErr = errors.New("only label filters are allowed")
}
})
if queryErr != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

pipeline, err := expr.Pipeline()
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

matchers := expr.Matchers()

from, through := util.RoundToMilliseconds(req.Start, req.End)
step := model.Time(req.Step)
if step < drain.TimeResolution {
Expand All @@ -86,7 +108,7 @@ func (i *instance) Iterator(ctx context.Context, req *logproto.QueryPatternsRequ

var iters []iter.Iterator
err = i.forMatchingStreams(matchers, func(s *stream) error {
iter, err := s.Iterator(ctx, from, through, step)
iter, err := s.Iterator(ctx, from, through, step, pipeline.ForStream(s.labels))
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/pattern/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/pattern/drain"
"github.com/grafana/loki/v3/pkg/pattern/iter"

Expand Down Expand Up @@ -49,12 +50,12 @@ func (s *stream) Push(
continue
}
s.lastTs = entry.Timestamp.UnixNano()
s.patterns.Train(entry.Line, entry.Timestamp.UnixNano())
s.patterns.Train(entry.Line, entry.Timestamp.UnixNano(), entry.StructuredMetadata)
}
return nil
}

func (s *stream) Iterator(_ context.Context, from, through, step model.Time) (iter.Iterator, error) {
func (s *stream) Iterator(_ context.Context, from, through, step model.Time, labelFilters log.StreamPipeline) (iter.Iterator, error) {
// todo we should improve locking.
s.mtx.Lock()
defer s.mtx.Unlock()
Expand All @@ -66,7 +67,7 @@ func (s *stream) Iterator(_ context.Context, from, through, step model.Time) (it
if cluster.String() == "" {
continue
}
iters = append(iters, cluster.Iterator(from, through, step))
iters = append(iters, cluster.Iterator(from, through, step, labelFilters))
}
return iter.NewMerge(iters...), nil
}
Expand Down
Loading