Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…into ServerSideApply-pr
  • Loading branch information
Sheikh-Abubaker committed Feb 19, 2024
2 parents 435c09b + ffc61fb commit 480ff54
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 357 deletions.
82 changes: 37 additions & 45 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,9 @@ var (
)

type metrics struct {
queueDuration prometheus.Histogram
inflightRequests prometheus.Summary
chunkRefsUnfiltered prometheus.Counter
chunkRefsFiltered prometheus.Counter
queueDuration prometheus.Histogram
inflightRequests prometheus.Summary
chunkRemovals *prometheus.CounterVec
}

func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics {
Expand All @@ -106,29 +105,15 @@ func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *
MaxAge: time.Minute,
AgeBuckets: 6,
}),
chunkRefsUnfiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
chunkRemovals: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunkrefs_pre_filtering",
Help: "Total amount of chunk refs pre filtering. Does not count chunk refs in failed requests.",
}),
chunkRefsFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunkrefs_post_filtering",
Help: "Total amount of chunk refs post filtering.",
}),
Name: "chunk_removals_total",
Help: "Total amount of removals received from the block querier partitioned by state. The state 'accepted' means that the removals are processed, the state 'dropped' means that the removals were received after the task context was done (e.g. client timeout, etc).",
}, []string{"state"}),
}
}

func (m *metrics) addUnfilteredCount(n int) {
m.chunkRefsUnfiltered.Add(float64(n))
}

func (m *metrics) addFilteredCount(n int) {
m.chunkRefsFiltered.Add(float64(n))
}

// SyncMap is a map structure which can be synchronized using the RWMutex
type SyncMap[k comparable, v any] struct {
sync.RWMutex
Expand Down Expand Up @@ -324,12 +309,8 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return nil, errors.New("from time must not be after through time")
}

numChunksUnfiltered := len(req.Refs)

// Shortcut if request does not contain filters
if len(req.Filters) == 0 {
g.metrics.addUnfilteredCount(numChunksUnfiltered)
g.metrics.addFilteredCount(len(req.Refs))
return &logproto.FilterChunkRefResponse{
ChunkRefs: req.Refs,
}, nil
Expand Down Expand Up @@ -374,15 +355,14 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
// When enqueuing, we also add the task to the pending tasks
g.pendingTasks.Add(task.ID, task)
})
go consumeTask(ctx, task, tasksCh, logger)
go g.consumeTask(ctx, task, tasksCh)
}

responses := responsesPool.Get(numSeries)
defer responsesPool.Put(responses)
remaining := len(tasks)

outer:
for {
for remaining > 0 {
select {
case <-ctx.Done():
return nil, errors.Wrap(ctx.Err(), "request failed")
Expand All @@ -393,23 +373,17 @@ outer:
}
responses = append(responses, task.responses...)
remaining--
if remaining == 0 {
break outer
}
}
}

for _, o := range responses {
if o.Removals.Len() == 0 {
continue
}
removeNotMatchingChunks(req, o, g.logger)
}
preFilterSeries := len(req.Refs)

g.metrics.addUnfilteredCount(numChunksUnfiltered)
g.metrics.addFilteredCount(len(req.Refs))
// TODO(chaudum): Don't wait for all responses before starting to filter chunks.
filtered := g.processResponses(req, responses)

level.Info(logger).Log("msg", "return filtered chunk refs", "unfiltered", numChunksUnfiltered, "filtered", len(req.Refs))
postFilterSeries := len(req.Refs)

level.Info(logger).Log("msg", "return filtered chunk refs", "pre_filter_series", preFilterSeries, "post_filter_series", postFilterSeries, "filtered_chunks", filtered)
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
}

Expand All @@ -419,16 +393,18 @@ outer:
// task is closed by the worker.
// Once the tasks is closed, it will send the task with the results from the
// block querier to the supplied task channel.
func consumeTask(ctx context.Context, task Task, tasksCh chan<- Task, logger log.Logger) {
logger = log.With(logger, "task", task.ID)
func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Task) {
logger := log.With(g.logger, "task", task.ID)

for res := range task.resCh {
select {
case <-ctx.Done():
level.Debug(logger).Log("msg", "drop partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len())
g.metrics.chunkRemovals.WithLabelValues("dropped").Add(float64(res.Removals.Len()))
default:
level.Debug(logger).Log("msg", "accept partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len())
task.responses = append(task.responses, res)
g.metrics.chunkRemovals.WithLabelValues("accepted").Add(float64(res.Removals.Len()))
}
}

Expand All @@ -441,21 +417,34 @@ func consumeTask(ctx context.Context, task Task, tasksCh chan<- Task, logger log
}
}

func removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output, logger log.Logger) {
func (g *Gateway) processResponses(req *logproto.FilterChunkRefRequest, responses []v1.Output) (filtered int) {
for _, o := range responses {
if o.Removals.Len() == 0 {
continue
}
filtered += g.removeNotMatchingChunks(req, o)
}
return
}

func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) (filtered int) {

// binary search index of fingerprint
idx := sort.Search(len(req.Refs), func(i int) bool {
return req.Refs[i].Fingerprint >= uint64(res.Fp)
})

// fingerprint not found
if idx >= len(req.Refs) {
level.Error(logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp))
level.Error(g.logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp))
return
}

// if all chunks of a fingerprint are are removed
// then remove the whole group from the response
if len(req.Refs[idx].Refs) == res.Removals.Len() {
filtered += len(req.Refs[idx].Refs)

req.Refs[idx] = nil // avoid leaking pointer
req.Refs = append(req.Refs[:idx], req.Refs[idx+1:]...)
return
Expand All @@ -465,10 +454,13 @@ func removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output,
toRemove := res.Removals[i]
for j := 0; j < len(req.Refs[idx].Refs); j++ {
if toRemove.Checksum == req.Refs[idx].Refs[j].Checksum {
filtered += 1

req.Refs[idx].Refs[j] = nil // avoid leaking pointer
req.Refs[idx].Refs = append(req.Refs[idx].Refs[:j], req.Refs[idx].Refs[j+1:]...)
j-- // since we removed the current item at index, we have to redo the same index
}
}
}
return
}
9 changes: 7 additions & 2 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,9 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
}

func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) {
g := &Gateway{
logger: log.NewNopLogger(),
}
t.Run("removing chunks partially", func(t *testing.T) {
req := &logproto.FilterChunkRefRequest{
Refs: []*logproto.GroupedChunkRefs{
Expand Down Expand Up @@ -450,7 +453,8 @@ func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) {
}},
},
}
removeNotMatchingChunks(req, res, log.NewNopLogger())
n := g.removeNotMatchingChunks(req, res)
require.Equal(t, 2, n)
require.Equal(t, expected, req)
})

Expand All @@ -474,7 +478,8 @@ func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) {
expected := &logproto.FilterChunkRefRequest{
Refs: []*logproto.GroupedChunkRefs{},
}
removeNotMatchingChunks(req, res, log.NewNopLogger())
n := g.removeNotMatchingChunks(req, res)
require.Equal(t, 3, n)
require.Equal(t, expected, req)
})

Expand Down
55 changes: 53 additions & 2 deletions pkg/bloomgateway/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,56 @@ import (
"sort"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
)

type querierMetrics struct {
chunksTotal prometheus.Counter
chunksFiltered prometheus.Counter
seriesTotal prometheus.Counter
seriesFiltered prometheus.Counter
}

func newQuerierMetrics(registerer prometheus.Registerer, namespace, subsystem string) *querierMetrics {
return &querierMetrics{
chunksTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunks_total",
Help: "Total amount of chunks pre filtering. Does not count chunks in failed requests.",
}),
chunksFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunks_filtered_total",
Help: "Total amount of chunks that have been filtered out. Does not count chunks in failed requests.",
}),
seriesTotal: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "series_total",
Help: "Total amount of series pre filtering. Does not count series in failed requests.",
}),
seriesFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "series_filtered_total",
Help: "Total amount of series that have been filtered out. Does not count series in failed requests.",
}),
}
}

// BloomQuerier is a store-level abstraction on top of Client
// It is used by the index gateway to filter ChunkRefs based on given line fiter expression.
type BloomQuerier struct {
c Client
logger log.Logger
c Client
logger log.Logger
metrics *querierMetrics
}

func NewQuerier(c Client, logger log.Logger) *BloomQuerier {
Expand All @@ -37,6 +76,9 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
defer groupedChunksRefPool.Put(grouped)
grouped = groupChunkRefs(chunkRefs, grouped)

preFilterChunks := len(chunkRefs)
preFilterSeries := len(grouped)

refs, err := bq.c.FilterChunks(ctx, tenant, from, through, grouped, filters...)
if err != nil {
return nil, err
Expand All @@ -55,6 +97,15 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from
})
}
}

postFilterChunks := len(result)
postFilterSeries := len(refs)

bq.metrics.chunksTotal.Add(float64(preFilterChunks))
bq.metrics.chunksFiltered.Add(float64(preFilterChunks - postFilterChunks))
bq.metrics.seriesTotal.Add(float64(preFilterSeries))
bq.metrics.seriesFiltered.Add(float64(preFilterSeries - postFilterSeries))

return result, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (w *worker) running(_ context.Context) error {
err = p.run(taskCtx, tasks)

if err != nil {
w.metrics.processDuration.WithLabelValues(w.id, labelSuccess).Observe(time.Since(start).Seconds())
w.metrics.processDuration.WithLabelValues(w.id, labelFailure).Observe(time.Since(start).Seconds())
w.metrics.tasksProcessed.WithLabelValues(w.id, labelFailure).Add(float64(len(tasks)))
level.Error(w.logger).Log("msg", "failed to process tasks", "err", err)
} else {
Expand Down
1 change: 1 addition & 0 deletions production/helm/loki/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Entries should include a reference to the pull request that introduced the chang
## 5.43.2

- [ENHANCEMENT] Added missing default values to support ServerSideApply
- [BUGFIX] Added `alibabacloud` to `isUsingObjectStorage` check.

## 5.43.1

Expand Down
2 changes: 1 addition & 1 deletion production/helm/loki/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,7 @@ Create the service endpoint including port for MinIO.

{{/* Determine if deployment is using object storage */}}
{{- define "loki.isUsingObjectStorage" -}}
{{- or (eq .Values.loki.storage.type "gcs") (eq .Values.loki.storage.type "s3") (eq .Values.loki.storage.type "azure") (eq .Values.loki.storage.type "swift") -}}
{{- or (eq .Values.loki.storage.type "gcs") (eq .Values.loki.storage.type "s3") (eq .Values.loki.storage.type "azure") (eq .Values.loki.storage.type "swift") (eq .Values.loki.storage.type "alibabacloud") -}}
{{- end -}}

{{/* Configure the correct name for the memberlist service */}}
Expand Down
Loading

0 comments on commit 480ff54

Please sign in to comment.