From 9f4267be07cdd16d3e7f594c4fa8e1e186f566b2 Mon Sep 17 00:00:00 2001 From: Ben Clive Date: Fri, 10 May 2024 17:52:32 +0100 Subject: [PATCH] feat: Store structured metadata with patterns --- pkg/pattern/drain/chunk.go | 40 ++++++++++++++++++++++++++------ pkg/pattern/drain/chunk_test.go | 19 ++++++++------- pkg/pattern/drain/drain.go | 15 ++++++------ pkg/pattern/drain/drain_test.go | 7 +++--- pkg/pattern/drain/log_cluster.go | 10 ++++---- pkg/pattern/ingester_querier.go | 17 +++++++++++++- pkg/pattern/instance.go | 26 +++++++++++++++++++-- pkg/pattern/stream.go | 7 +++--- 8 files changed, 106 insertions(+), 35 deletions(-) diff --git a/pkg/pattern/drain/chunk.go b/pkg/pattern/drain/chunk.go index 9b1e34e2e3a19..a4dbbb20c6f5c 100644 --- a/pkg/pattern/drain/chunk.go +++ b/pkg/pattern/drain/chunk.go @@ -4,9 +4,12 @@ import ( "sort" "time" + "github.com/grafana/loki/pkg/push" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/pattern/iter" ) @@ -21,16 +24,20 @@ const ( type Chunks []Chunk type Chunk struct { - Samples []logproto.PatternSample + Samples []logproto.PatternSample + StructuredMetadata map[string]string } -func newChunk(ts model.Time) Chunk { +func newChunk(ts model.Time, structuredMetadata push.LabelsAdapter) Chunk { maxSize := int(maxChunkTime.Nanoseconds()/TimeResolution.UnixNano()) + 1 - v := Chunk{Samples: make([]logproto.PatternSample, 1, maxSize)} + v := Chunk{Samples: make([]logproto.PatternSample, 1, maxSize), StructuredMetadata: make(map[string]string, 32)} v.Samples[0] = logproto.PatternSample{ Timestamp: ts, Value: 1, } + for _, lbl := range structuredMetadata { + v.StructuredMetadata[lbl.Name] = lbl.Value + } return v } @@ -94,31 +101,50 @@ func (c Chunk) ForRange(start, end, step model.Time) []logproto.PatternSample { return aggregatedSamples } -func (c *Chunks) Add(ts model.Time) { +func (c *Chunks) Add(ts model.Time, metadata push.LabelsAdapter) { t := truncateTimestamp(ts, TimeResolution) if len(*c) == 0 { - *c = append(*c, newChunk(t)) + *c = append(*c, newChunk(t, metadata)) return } last := &(*c)[len(*c)-1] if last.Samples[len(last.Samples)-1].Timestamp == t { last.Samples[len(last.Samples)-1].Value++ + for _, md := range metadata { + last.StructuredMetadata[md.Name] = md.Value + } return } if !last.spaceFor(t) { - *c = append(*c, newChunk(t)) + *c = append(*c, newChunk(t, metadata)) return } last.Samples = append(last.Samples, logproto.PatternSample{ Timestamp: t, Value: 1, }) + for _, md := range metadata { + last.StructuredMetadata[md.Name] = md.Value + } } -func (c Chunks) Iterator(pattern string, from, through, step model.Time) iter.Iterator { +func (c Chunks) Iterator(pattern string, from, through, step model.Time, labelFilters log.StreamPipeline) iter.Iterator { iters := make([]iter.Iterator, 0, len(c)) + var emptyLine []byte for _, chunk := range c { + chunkMetadata := make([]labels.Label, 0, len(chunk.StructuredMetadata)) + for k, v := range chunk.StructuredMetadata { + chunkMetadata = append(chunkMetadata, labels.Label{ + Name: k, + Value: v, + }) + } + _, _, matches := labelFilters.Process(0, emptyLine, chunkMetadata...) + if !matches { + continue + } + samples := chunk.ForRange(from, through, step) if len(samples) == 0 { continue diff --git a/pkg/pattern/drain/chunk_test.go b/pkg/pattern/drain/chunk_test.go index 4863a6629729a..a59d534fc0798 100644 --- a/pkg/pattern/drain/chunk_test.go +++ b/pkg/pattern/drain/chunk_test.go @@ -5,30 +5,33 @@ import ( "testing" "time" + "github.com/grafana/loki/pkg/push" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logproto" ) +var metadata = push.LabelsAdapter{} + func TestAdd(t *testing.T) { cks := Chunks{} - cks.Add(TimeResolution + 1) - cks.Add(TimeResolution + 2) - cks.Add(2*TimeResolution + 1) + cks.Add(TimeResolution+1, metadata) + cks.Add(TimeResolution+2, metadata) + cks.Add(2*TimeResolution+1, metadata) require.Equal(t, 1, len(cks)) require.Equal(t, 2, len(cks[0].Samples)) - cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds()) + TimeResolution + 1) + cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds())+TimeResolution+1, metadata) require.Equal(t, 2, len(cks)) require.Equal(t, 1, len(cks[1].Samples)) } func TestIterator(t *testing.T) { cks := Chunks{} - cks.Add(TimeResolution + 1) - cks.Add(TimeResolution + 2) - cks.Add(2*TimeResolution + 1) - cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds()) + TimeResolution + 1) + cks.Add(TimeResolution+1, metadata) + cks.Add(TimeResolution+2, metadata) + cks.Add(2*TimeResolution+1, metadata) + cks.Add(model.TimeFromUnixNano(time.Hour.Nanoseconds())+TimeResolution+1, metadata) it := cks.Iterator("test", model.Time(0), model.Time(time.Hour.Nanoseconds()), TimeResolution) require.NotNil(t, it) diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index ade8fca366b8a..cd90294f931ac 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -28,6 +28,7 @@ import ( "strings" "unicode" + "github.com/grafana/loki/pkg/push" "github.com/hashicorp/golang-lru/v2/simplelru" "github.com/prometheus/common/model" @@ -171,15 +172,15 @@ func (d *Drain) Clusters() []*LogCluster { return d.idToCluster.Values() } -func (d *Drain) TrainTokens(tokens []string, stringer func([]string) string, ts int64) *LogCluster { - return d.train(tokens, stringer, ts) +func (d *Drain) TrainTokens(tokens []string, stringer func([]string) string, ts int64, metadata push.LabelsAdapter) *LogCluster { + return d.train(tokens, stringer, ts, metadata) } -func (d *Drain) Train(content string, ts int64) *LogCluster { - return d.train(d.getContentAsTokens(content), nil, ts) +func (d *Drain) Train(content string, ts int64, metadata push.LabelsAdapter) *LogCluster { + return d.train(d.getContentAsTokens(content), nil, ts, metadata) } -func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64) *LogCluster { +func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64, metadata push.LabelsAdapter) *LogCluster { matchCluster := d.treeSearch(d.rootNode, tokens, d.config.SimTh, false) // Match no existing log cluster if matchCluster == nil { @@ -192,13 +193,13 @@ func (d *Drain) train(tokens []string, stringer func([]string) string, ts int64) Stringer: stringer, Chunks: Chunks{}, } - matchCluster.append(model.TimeFromUnixNano(ts)) + matchCluster.append(model.TimeFromUnixNano(ts), metadata) d.idToCluster.Set(clusterID, matchCluster) d.addSeqToPrefixTree(d.rootNode, matchCluster) } else { newTemplateTokens := d.createTemplate(tokens, matchCluster.Tokens) matchCluster.Tokens = newTemplateTokens - matchCluster.append(model.TimeFromUnixNano(ts)) + matchCluster.append(model.TimeFromUnixNano(ts), metadata) // Touch cluster to update its state in the cache. d.idToCluster.Get(matchCluster.id) } diff --git a/pkg/pattern/drain/drain_test.go b/pkg/pattern/drain/drain_test.go index ef7754c4ed57e..238e2f05e05cb 100644 --- a/pkg/pattern/drain/drain_test.go +++ b/pkg/pattern/drain/drain_test.go @@ -5,6 +5,7 @@ import ( "os" "testing" + "github.com/grafana/loki/pkg/push" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logql/log/pattern" @@ -106,7 +107,7 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { scanner := bufio.NewScanner(file) for scanner.Scan() { line := scanner.Text() - tt.drain.Train(line, 0) + tt.drain.Train(line, 0, push.LabelsAdapter{}) } var output []string @@ -152,7 +153,7 @@ func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { for _, line := range tt.inputLines { - tt.drain.Train(line, 0) + tt.drain.Train(line, 0, push.LabelsAdapter{}) } t.Log("Learned clusters", tt.drain.Clusters()) @@ -217,7 +218,7 @@ func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) tt := tt t.Run(tt.name, func(t *testing.T) { for _, line := range tt.inputLines { - tt.drain.Train(line, 0) + tt.drain.Train(line, 0, push.LabelsAdapter{}) } require.Equal(t, 1, len(tt.drain.Clusters())) cluster := tt.drain.Clusters()[0] diff --git a/pkg/pattern/drain/log_cluster.go b/pkg/pattern/drain/log_cluster.go index af5932d16f706..4c1d3328a9dbe 100644 --- a/pkg/pattern/drain/log_cluster.go +++ b/pkg/pattern/drain/log_cluster.go @@ -4,9 +4,11 @@ import ( "strings" "time" + "github.com/grafana/loki/pkg/push" "github.com/prometheus/common/model" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/pattern/iter" ) @@ -25,9 +27,9 @@ func (c *LogCluster) String() string { return strings.Join(c.Tokens, " ") } -func (c *LogCluster) append(ts model.Time) { +func (c *LogCluster) append(ts model.Time, metadata push.LabelsAdapter) { c.Size++ - c.Chunks.Add(ts) + c.Chunks.Add(ts, metadata) } func (c *LogCluster) merge(samples []*logproto.PatternSample) { @@ -35,8 +37,8 @@ func (c *LogCluster) merge(samples []*logproto.PatternSample) { c.Chunks.merge(samples) } -func (c *LogCluster) Iterator(from, through, step model.Time) iter.Iterator { - return c.Chunks.Iterator(c.String(), from, through, step) +func (c *LogCluster) Iterator(from, through, step model.Time, labelFilters log.StreamPipeline) iter.Iterator { + return c.Chunks.Iterator(c.String(), from, through, step, labelFilters) } func (c *LogCluster) Samples() []*logproto.PatternSample { diff --git a/pkg/pattern/ingester_querier.go b/pkg/pattern/ingester_querier.go index bfbaeb92aedbc..7fb5b2330816d 100644 --- a/pkg/pattern/ingester_querier.go +++ b/pkg/pattern/ingester_querier.go @@ -8,6 +8,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/ring" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/v3/pkg/logproto" @@ -44,10 +45,24 @@ func NewIngesterQuerier( } func (q *IngesterQuerier) Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error) { - _, err := syntax.ParseMatchers(req.Query, true) + expr, err := syntax.ParseLogSelector(req.Query, true) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } + var queryErr error + expr.Walk(func(treeExpr syntax.Expr) { + switch treeExpr.(type) { + case *syntax.MatchersExpr: // Permit + case *syntax.PipelineExpr: // Permit + case *syntax.LabelFilterExpr: // Permit + default: + queryErr = errors.New("only label filters are allowed") + } + }) + if queryErr != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + resps, err := q.forAllIngesters(ctx, func(_ context.Context, client logproto.PatternClient) (interface{}, error) { return client.Query(ctx, req) }) diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index 7ac0099edec36..7cfe6852f4d42 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -8,6 +8,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/multierror" + "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -74,10 +75,31 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { // Iterator returns an iterator of pattern samples matching the given query patterns request. func (i *instance) Iterator(ctx context.Context, req *logproto.QueryPatternsRequest) (iter.Iterator, error) { - matchers, err := syntax.ParseMatchers(req.Query, true) + expr, err := syntax.ParseLogSelector(req.Query, true) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } + var queryErr error + expr.Walk(func(treeExpr syntax.Expr) { + switch treeExpr.(type) { + case *syntax.MatchersExpr: // Permit + case *syntax.PipelineExpr: // Permit + case *syntax.LabelFilterExpr: // Permit + default: + queryErr = errors.New("only label filters are allowed") + } + }) + if queryErr != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + + pipeline, err := expr.Pipeline() + if err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + + matchers := expr.Matchers() + from, through := util.RoundToMilliseconds(req.Start, req.End) step := model.Time(req.Step) if step < drain.TimeResolution { @@ -86,7 +108,7 @@ func (i *instance) Iterator(ctx context.Context, req *logproto.QueryPatternsRequ var iters []iter.Iterator err = i.forMatchingStreams(matchers, func(s *stream) error { - iter, err := s.Iterator(ctx, from, through, step) + iter, err := s.Iterator(ctx, from, through, step, pipeline.ForStream(s.labels)) if err != nil { return err } diff --git a/pkg/pattern/stream.go b/pkg/pattern/stream.go index 8321fce9f647c..a0160f04945f8 100644 --- a/pkg/pattern/stream.go +++ b/pkg/pattern/stream.go @@ -6,6 +6,7 @@ import ( "time" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/pattern/drain" "github.com/grafana/loki/v3/pkg/pattern/iter" @@ -49,12 +50,12 @@ func (s *stream) Push( continue } s.lastTs = entry.Timestamp.UnixNano() - s.patterns.Train(entry.Line, entry.Timestamp.UnixNano()) + s.patterns.Train(entry.Line, entry.Timestamp.UnixNano(), entry.StructuredMetadata) } return nil } -func (s *stream) Iterator(_ context.Context, from, through, step model.Time) (iter.Iterator, error) { +func (s *stream) Iterator(_ context.Context, from, through, step model.Time, labelFilters log.StreamPipeline) (iter.Iterator, error) { // todo we should improve locking. s.mtx.Lock() defer s.mtx.Unlock() @@ -66,7 +67,7 @@ func (s *stream) Iterator(_ context.Context, from, through, step model.Time) (it if cluster.String() == "" { continue } - iters = append(iters, cluster.Iterator(from, through, step)) + iters = append(iters, cluster.Iterator(from, through, step, labelFilters)) } return iter.NewMerge(iters...), nil }