Skip to content

Commit

Permalink
Merge branch 'main' into helm/overrides-exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
bentonam authored Jan 9, 2025
2 parents e94213f + b5c627b commit f0e52e6
Show file tree
Hide file tree
Showing 29 changed files with 508 additions and 188 deletions.
4 changes: 0 additions & 4 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,6 @@ block_builder:
[scheduler_grpc_client_config: <grpc_client>]

block_scheduler:
# Consumer group used by block scheduler to track the last consumed offset.
# CLI flag: -block-scheduler.consumer-group
[consumer_group: <string> | default = "block-scheduler"]

# How often the scheduler should plan jobs.
# CLI flag: -block-scheduler.interval
[interval: <duration> | default = 15m]
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/Azure/go-autorest/autorest/adal v0.9.24
github.com/Azure/go-autorest/autorest/azure/auth v0.5.13
github.com/IBM/sarama v1.44.0
github.com/IBM/sarama v1.45.0
github.com/Masterminds/sprig/v3 v3.3.0
github.com/NYTimes/gziphandler v1.1.1
github.com/Workiva/go-datastructures v1.1.5
Expand Down Expand Up @@ -116,7 +116,7 @@ require (
github.com/DmitriyVTitov/size v1.5.0
github.com/IBM/go-sdk-core/v5 v5.18.3
github.com/IBM/ibm-cos-sdk-go v1.12.0
github.com/axiomhq/hyperloglog v0.2.2
github.com/axiomhq/hyperloglog v0.2.3
github.com/buger/jsonparser v1.1.1
github.com/d4l3k/messagediff v1.2.1
github.com/dolthub/swiss v0.2.1
Expand Down Expand Up @@ -147,7 +147,7 @@ require (
go4.org/netipx v0.0.0-20230125063823-8449b0a6169f
golang.org/x/oauth2 v0.25.0
golang.org/x/text v0.21.0
google.golang.org/protobuf v1.36.1
google.golang.org/protobuf v1.36.2
gotest.tools v2.2.0+incompatible
k8s.io/apimachinery v0.32.0
k8s.io/utils v0.0.0-20241104163129-6fe5fd82f078
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ github.com/IBM/go-sdk-core/v5 v5.18.3 h1:q6IDU3N2bHGwijK9pMnzKC5gqdaRII56NzB4ZNd
github.com/IBM/go-sdk-core/v5 v5.18.3/go.mod h1:5kILxqEWOrwMhoD2b7J6Xv9Z2M6YIdT/6Oy+XRSsCGQ=
github.com/IBM/ibm-cos-sdk-go v1.12.0 h1:Wrk3ve4JS3euhl7XjNFd3RlvPT56199G2/rKaPWpRKU=
github.com/IBM/ibm-cos-sdk-go v1.12.0/go.mod h1:v/VBvFuysZMIX9HcaIrz6a+FLVw9px8fq6XabFwD+E4=
github.com/IBM/sarama v1.44.0 h1:puNKqcScjSAgVLramjsuovZrS0nJZFVsrvuUymkWqhE=
github.com/IBM/sarama v1.44.0/go.mod h1:MxQ9SvGfvKIorbk077Ff6DUnBlGpidiQOtU2vuBaxVw=
github.com/IBM/sarama v1.45.0 h1:IzeBevTn809IJ/dhNKhP5mpxEXTmELuezO2tgHD9G5E=
github.com/IBM/sarama v1.45.0/go.mod h1:EEay63m8EZkeumco9TDXf2JT3uDnZsZqFgV46n4yZdY=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/MasslessParticle/azure-storage-blob-go v0.14.1-0.20240322194317-344980fda573 h1:DCPjdUAi+jcGnL7iN+A7uNY8xG584oMRuisYh/VE21E=
github.com/MasslessParticle/azure-storage-blob-go v0.14.1-0.20240322194317-344980fda573/go.mod h1:SMqIBi+SuiQH32bvyjngEewEeXoPfKMgWlBDaYf6fck=
Expand Down Expand Up @@ -222,8 +222,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.32.4 h1:yDxvkz3/uOKfxnv8YhzOi9m+2OGI
github.com/aws/aws-sdk-go-v2/service/sts v1.32.4/go.mod h1:9XEUty5v5UAsMiFOBJrNibZgwCeOma73jgGwwhgffa8=
github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro=
github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/axiomhq/hyperloglog v0.2.2 h1:9X9rOdYx82zXKgd1aMsDZNUw3d7DKAHhd2J305HZPA8=
github.com/axiomhq/hyperloglog v0.2.2/go.mod h1:DLUK9yIzpU5B6YFLjxTIcbHu1g4Y1WQb1m5RH3radaM=
github.com/axiomhq/hyperloglog v0.2.3 h1:2ZGwz3FGcx77e9/aNjqJijsGhH6RZOlglzxnDpVBCQY=
github.com/axiomhq/hyperloglog v0.2.3/go.mod h1:DLUK9yIzpU5B6YFLjxTIcbHu1g4Y1WQb1m5RH3radaM=
github.com/baidubce/bce-sdk-go v0.9.212 h1:B3PUoaFi4m13wP7gWObznjPLZ5umQ1BHjO/UoSsj3x4=
github.com/baidubce/bce-sdk-go v0.9.212/go.mod h1:zbYJMQwE4IZuyrJiFO8tO8NbtYiKTFTbwh4eIsqjVdg=
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 h1:6df1vn4bBlDDo4tARvBm7l6KA9iVMnE3NWizDeWSrps=
Expand Down Expand Up @@ -1653,8 +1653,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU=
google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
14 changes: 9 additions & 5 deletions pkg/blockbuilder/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@ func (i *BlockBuilder) runOne(ctx context.Context, c *kgo.Client, workerID strin
completion.Success = false
}

// remove from inflight jobs to stop sending sync requests
i.jobsMtx.Lock()
delete(i.inflightJobs, job.ID())
i.metrics.inflightJobs.Set(float64(len(i.inflightJobs)))
i.jobsMtx.Unlock()

if _, err := withBackoff(
ctx,
i.cfg.Backoff,
Expand All @@ -292,16 +298,12 @@ func (i *BlockBuilder) runOne(ctx context.Context, c *kgo.Client, workerID strin
return true, err
}

i.jobsMtx.Lock()
delete(i.inflightJobs, job.ID())
i.metrics.inflightJobs.Set(float64(len(i.inflightJobs)))
i.jobsMtx.Unlock()

return true, err
}

func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types.Job, logger log.Logger) (lastOffsetConsumed int64, err error) {
level.Debug(logger).Log("msg", "beginning job")
start := time.Now()

indexer := newTsdbCreator()
appender := newAppender(i.id,
Expand Down Expand Up @@ -505,6 +507,8 @@ func (i *BlockBuilder) processJob(ctx context.Context, c *kgo.Client, job *types
level.Info(logger).Log(
"msg", "successfully processed job",
"last_offset", lastOffset,
"duration", time.Since(start),
"records", lastOffset-job.Offsets().Min,
)

return lastOffset, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func (q *JobQueue) SyncJob(jobID string, job *types.Job) {
case types.JobStatusInProgress:
case types.JobStatusComplete, types.JobStatusFailed, types.JobStatusExpired:
// Job already completed, re-enqueue a new one
level.Warn(q.logger).Log("msg", "job already completed, re-enqueuing", "job", jobID, "status", jobMeta.Status)
registerInProgress()
return
default:
Expand Down
2 changes: 0 additions & 2 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ var (
)

type Config struct {
ConsumerGroup string `yaml:"consumer_group"`
Interval time.Duration `yaml:"interval"`
LookbackPeriod time.Duration `yaml:"lookback_period"`
Strategy string `yaml:"strategy"`
Expand All @@ -36,7 +35,6 @@ type Config struct {

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.Interval, prefix+"interval", 15*time.Minute, "How often the scheduler should plan jobs.")
f.StringVar(&cfg.ConsumerGroup, prefix+"consumer-group", "block-scheduler", "Consumer group used by block scheduler to track the last consumed offset.")
f.DurationVar(&cfg.LookbackPeriod, prefix+"lookback-period", 0, "Lookback period used by the scheduler to plan jobs when the consumer group has no commits. 0 consumes from the start of the partition.")
f.StringVar(
&cfg.Strategy,
Expand Down
7 changes: 5 additions & 2 deletions pkg/logql/count_min_sketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package logql
import (
"container/heap"
"fmt"
"slices"
"strings"

"github.com/axiomhq/hyperloglog"
"github.com/cespare/xxhash/v2"
Expand Down Expand Up @@ -187,16 +189,17 @@ func NewHeapCountMinSketchVector(ts int64, metricsLength, maxLabels int) HeapCou
}

func (v *HeapCountMinSketchVector) Add(metric labels.Labels, value float64) {
slices.SortFunc(metric, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) })
v.buffer = metric.Bytes(v.buffer)

v.F.Add(v.buffer, value)

// Add our metric if we haven't seen it

// TODO(karsten): There is a chance that the ids match but not the labels due to hash collision. Ideally there's
// an else block the compares the series labels. However, that's not trivial. Besides, instance.Series has the
// same issue in its deduping logic.
id := xxhash.Sum64(v.buffer)

// Add our metric if we haven't seen it
if _, ok := v.observed[id]; !ok {
heap.Push(v, metric)
v.observed[id] = struct{}{}
Expand Down
5 changes: 4 additions & 1 deletion pkg/logql/log/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package log

import (
"fmt"
"slices"
"sort"
"strings"
"sync"

"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -585,7 +587,8 @@ func (b *LabelsBuilder) LabelsResult() LabelsResult {

// Get all labels at once and sort them
b.buf = b.UnsortedLabels(b.buf)
sort.Sort(b.buf)
// sort.Sort(b.buf)
slices.SortFunc(b.buf, func(a, b labels.Label) int { return strings.Compare(a.Name, b.Name) })
hash := b.hasher.Hash(b.buf)

if cached, ok := b.resultCache[hash]; ok {
Expand Down
14 changes: 13 additions & 1 deletion pkg/logql/log/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,25 +241,32 @@ func (p *fakeParseHints) ShouldExtract(key string) bool {
p.checkCount++
return key == p.label || p.extractAll
}

func (p *fakeParseHints) ShouldExtractPrefix(prefix string) bool {
return prefix == p.label || p.extractAll
}

func (p *fakeParseHints) NoLabels() bool {
return false
}

func (p *fakeParseHints) RecordExtracted(_ string) {
p.count++
}

func (p *fakeParseHints) AllRequiredExtracted() bool {
return !p.extractAll && p.count == 1
}

func (p *fakeParseHints) Reset() {
p.checkCount = 0
p.count = 0
}

func (p *fakeParseHints) PreserveError() bool {
return false
}

func (p *fakeParseHints) ShouldContinueParsingLine(_ string, _ *LabelsBuilder) bool {
return p.keepGoing
}
Expand Down Expand Up @@ -656,30 +663,36 @@ func Benchmark_Parser(b *testing.B) {
b.Run(tt.name, func(b *testing.B) {
line := []byte(tt.line)
b.Run("no labels hints", func(b *testing.B) {
b.ReportAllocs()
builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
for n := 0; n < b.N; n++ {
builder.Reset()
_, _ = tt.s.Process(0, line, builder)
builder.LabelsResult()
}
})

b.Run("labels hints", func(b *testing.B) {
b.ReportAllocs()
builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
builder.parserKeyHints = NewParserHint(tt.LabelParseHints, tt.LabelParseHints, false, false, "", nil)

for n := 0; n < b.N; n++ {
builder.Reset()
_, _ = tt.s.Process(0, line, builder)
builder.LabelsResult()
}
})

b.Run("inline stages", func(b *testing.B) {
b.ReportAllocs()
stages := []Stage{NewStringLabelFilter(tt.LabelFilterParseHint)}
builder := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash())
builder.parserKeyHints = NewParserHint(nil, nil, false, false, ", nil", stages)
for n := 0; n < b.N; n++ {
builder.Reset()
_, _ = tt.s.Process(0, line, builder)
builder.LabelsResult()
}
})
})
Expand Down Expand Up @@ -1251,7 +1264,6 @@ func TestXExpressionParserFailures(t *testing.T) {
},
}
for _, tt := range tests {

t.Run(tt.name, func(t *testing.T) {
_, err := NewLogfmtExpressionParser([]LabelExtractionExpr{tt.expression}, false)

Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/sketch/cms.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func NewCountMinSketch(w, d uint32) (*CountMinSketch, error) {
Depth: d,
Width: w,
Counters: make2dslice(w, d),
HyperLogLog: hyperloglog.New16(),
HyperLogLog: hyperloglog.New16NoSparse(),
}, nil
}

Expand Down
20 changes: 13 additions & 7 deletions pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,18 +170,20 @@ func (in instance) For(
go func() {
err := concurrency.ForEachJob(ctx, len(queries), in.parallelism, func(ctx context.Context, i int) error {
res, err := fn(queries[i])
if err != nil {
return err
}
response := logql.Resp{
I: i,
Res: res,
Err: err,
}

// Feed the result into the channel unless the work has completed.
select {
case <-ctx.Done():
case ch <- response:
}
return err
return nil
})
if err != nil {
ch <- logql.Resp{
Expand All @@ -192,15 +194,19 @@ func (in instance) For(
close(ch)
}()

var err error
for resp := range ch {
if resp.Err != nil {
return nil, resp.Err
if err != nil {
continue
}
if err := acc.Accumulate(ctx, resp.Res, resp.I); err != nil {
return nil, err
if resp.Err != nil {
err = resp.Err
continue
}
err = acc.Accumulate(ctx, resp.Res, resp.I)
}
return acc.Result(), nil

return acc.Result(), err
}

// convert to matrix
Expand Down
6 changes: 3 additions & 3 deletions pkg/ruler/base/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,9 +407,9 @@ func grafanaLinkForExpression(expr, datasourceUID string) string {
}

marshaledExpression, _ := json.Marshal(exprStruct)
escapedExpression := url.QueryEscape(string(marshaledExpression))
str := `/explore?left={"queries":[%s]}`
return fmt.Sprintf(str, escapedExpression)
params := url.Values{}
params.Set("left", fmt.Sprintf(`{"queries":[%s]}`, marshaledExpression))
return `/explore?` + params.Encode()
}

// SendAlerts implements a rules.NotifyFunc for a Notifier.
Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/base/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1733,7 +1733,7 @@ func TestSendAlerts(t *testing.T) {
Annotations: []labels.Label{{Name: "a2", Value: "v2"}},
StartsAt: time.Unix(2, 0),
EndsAt: time.Unix(3, 0),
GeneratorURL: fmt.Sprintf("http://localhost:8080/explore?left={\"queries\":[%s]}", escapedExpression),
GeneratorURL: fmt.Sprintf("http://localhost:8080/explore?left=%%7B%%22queries%%22%%3A%%5B%s%%5D%%7D", escapedExpression),
},
},
},
Expand All @@ -1753,7 +1753,7 @@ func TestSendAlerts(t *testing.T) {
Annotations: []labels.Label{{Name: "a2", Value: "v2"}},
StartsAt: time.Unix(2, 0),
EndsAt: time.Unix(4, 0),
GeneratorURL: fmt.Sprintf("http://localhost:8080/explore?left={\"queries\":[%s]}", escapedExpression),
GeneratorURL: fmt.Sprintf("http://localhost:8080/explore?left=%%7B%%22queries%%22%%3A%%5B%s%%5D%%7D", escapedExpression),
},
},
},
Expand Down
Loading

0 comments on commit f0e52e6

Please sign in to comment.