Skip to content

Commit

Permalink
chore(blooms): Various minor code cleanups (#13332)
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Jun 26, 2024
1 parent 40ee766 commit 3b0502d
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 48 deletions.
11 changes: 0 additions & 11 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"flag"
"io"
"math"
"sort"

"github.com/go-kit/log"
Expand Down Expand Up @@ -208,7 +207,6 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
return nil, nil
}

firstFp, lastFp := uint64(math.MaxUint64), uint64(0)
pos := make(map[string]int)
servers := make([]addrWithGroups, 0, len(blocks))
for _, blockWithSeries := range blocks {
Expand All @@ -217,15 +215,6 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, _ string, interval blo
return nil, errors.Wrapf(err, "server address for block: %s", blockWithSeries.block)
}

// min/max fingerprint needed for the cache locality score
first, last := getFirstLast(blockWithSeries.series)
if first.Fingerprint < firstFp {
firstFp = first.Fingerprint
}
if last.Fingerprint > lastFp {
lastFp = last.Fingerprint
}

if idx, found := pos[addr]; found {
servers[idx].groups = append(servers[idx].groups, blockWithSeries.series...)
servers[idx].blocks = append(servers[idx].blocks, blockWithSeries.block.String())
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomgateway/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@ func shortRef(f, t model.Time, c uint32) *logproto.ShortRef {

func TestGatewayClient_MergeSeries(t *testing.T) {
inputs := [][]*logproto.GroupedChunkRefs{
// response 1
// response 1 -- sorted
{
{Fingerprint: 0x00, Refs: []*logproto.ShortRef{shortRef(0, 1, 1), shortRef(1, 2, 2)}}, // not overlapping
{Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks
{Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(0, 1, 5), shortRef(1, 2, 6)}}, // partially overlapping chunks
},
// response 2
// response 2 -- not sorted
{
{Fingerprint: 0x03, Refs: []*logproto.ShortRef{shortRef(0, 1, 8), shortRef(1, 2, 9)}}, // not overlapping
{Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks
{Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(1, 2, 6), shortRef(2, 3, 7)}}, // partially overlapping chunks
{Fingerprint: 0x03, Refs: []*logproto.ShortRef{shortRef(0, 1, 8), shortRef(1, 2, 9)}}, // not overlapping
},
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task {
interval: t.interval,
table: t.table,
ctx: t.ctx,
done: make(chan struct{}),
done: t.done,
}
}

Expand Down
26 changes: 8 additions & 18 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package bloomgateway

import (
"context"
"math"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -35,21 +34,12 @@ type processor struct {
metrics *workerMetrics
}

func (p *processor) run(ctx context.Context, tasks []Task) error {
return p.runWithBounds(ctx, tasks, v1.MultiFingerprintBounds{{Min: 0, Max: math.MaxUint64}})
}

func (p *processor) runWithBounds(ctx context.Context, tasks []Task, bounds v1.MultiFingerprintBounds) error {
func (p *processor) processTasks(ctx context.Context, tasks []Task) error {
tenant := tasks[0].tenant
level.Info(p.logger).Log(
"msg", "process tasks with bounds",
"tenant", tenant,
"tasks", len(tasks),
"bounds", len(bounds),
)
level.Info(p.logger).Log("msg", "process tasks", "tenant", tenant, "tasks", len(tasks))

for ts, tasks := range group(tasks, func(t Task) config.DayTime { return t.table }) {
err := p.processTasks(ctx, tenant, ts, bounds, tasks)
err := p.processTasksForDay(ctx, tenant, ts, tasks)
if err != nil {
for _, task := range tasks {
task.CloseWithError(err)
Expand All @@ -63,7 +53,7 @@ func (p *processor) runWithBounds(ctx context.Context, tasks []Task, bounds v1.M
return nil
}

func (p *processor) processTasks(ctx context.Context, tenant string, day config.DayTime, _ v1.MultiFingerprintBounds, tasks []Task) error {
func (p *processor) processTasksForDay(ctx context.Context, tenant string, day config.DayTime, tasks []Task) error {
level.Info(p.logger).Log("msg", "process tasks for day", "tenant", tenant, "tasks", len(tasks), "day", day.String())
var duration time.Duration

Expand All @@ -72,10 +62,10 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config.
blocksRefs = append(blocksRefs, task.blocks...)
}

data := partitionTasks(tasks, blocksRefs)
tasksByBlock := partitionTasksByBlock(tasks, blocksRefs)

refs := make([]bloomshipper.BlockRef, 0, len(data))
for _, block := range data {
refs := make([]bloomshipper.BlockRef, 0, len(tasksByBlock))
for _, block := range tasksByBlock {
refs = append(refs, block.ref)
}

Expand Down Expand Up @@ -103,7 +93,7 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config.
}

startProcess := time.Now()
res := p.processBlocks(ctx, bqs, data)
res := p.processBlocks(ctx, bqs, tasksByBlock)
duration = time.Since(startProcess)

for _, t := range tasks {
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestProcessor(t *testing.T) {
}(tasks[i])
}

err := p.run(ctx, tasks)
err := p.processTasks(ctx, tasks)
wg.Wait()
require.NoError(t, err)
require.Equal(t, int64(0), results.Load())
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestProcessor(t *testing.T) {
}(tasks[i])
}

err := p.run(ctx, tasks)
err := p.processTasks(ctx, tasks)
wg.Wait()
require.NoError(t, err)
require.Equal(t, int64(len(swb.series)), results.Load())
Expand Down Expand Up @@ -267,7 +267,7 @@ func TestProcessor(t *testing.T) {
}(tasks[i])
}

err := p.run(ctx, tasks)
err := p.processTasks(ctx, tasks)
wg.Wait()
require.Errorf(t, err, "store failed")
require.Equal(t, int64(0), results.Load())
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type blockWithTasks struct {
tasks []Task
}

func partitionTasks(tasks []Task, blocks []bloomshipper.BlockRef) []blockWithTasks {
func partitionTasksByBlock(tasks []Task, blocks []bloomshipper.BlockRef) []blockWithTasks {
result := make([]blockWithTasks, 0, len(blocks))

for _, block := range blocks {
Expand Down
37 changes: 33 additions & 4 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func mkBlockRef(minFp, maxFp uint64) bloomshipper.BlockRef {
}
}

func TestPartitionTasks(t *testing.T) {
func TestPartitionTasksByBlock(t *testing.T) {

t.Run("consecutive block ranges", func(t *testing.T) {
bounds := []bloomshipper.BlockRef{
Expand All @@ -93,7 +93,7 @@ func TestPartitionTasks(t *testing.T) {
tasks[i%nTasks].series = append(tasks[i%nTasks].series, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)})
}

results := partitionTasks(tasks, bounds)
results := partitionTasksByBlock(tasks, bounds)
require.Equal(t, 3, len(results)) // ensure we only return bounds in range

actualFingerprints := make([]*logproto.GroupedChunkRefs, 0, nSeries)
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestPartitionTasks(t *testing.T) {
task.series = append(task.series, &logproto.GroupedChunkRefs{Fingerprint: uint64(i)})
}

results := partitionTasks([]Task{task}, bounds)
results := partitionTasksByBlock([]Task{task}, bounds)
require.Equal(t, 3, len(results)) // ensure we only return bounds in range
for _, res := range results {
// ensure we have the right number of tasks per bound
Expand All @@ -153,9 +153,38 @@ func TestPartitionTasks(t *testing.T) {
},
}

results := partitionTasks(tasks, bounds)
results := partitionTasksByBlock(tasks, bounds)
require.Len(t, results, 0)
})

t.Run("overlapping and unsorted block ranges", func(t *testing.T) {
bounds := []bloomshipper.BlockRef{
mkBlockRef(5, 14),
mkBlockRef(0, 9),
mkBlockRef(10, 19),
}

tasks := []Task{
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 6},
},
},
{
series: []*logproto.GroupedChunkRefs{
{Fingerprint: 12},
},
},
}

expected := []blockWithTasks{
{ref: bounds[0], tasks: tasks}, // both tasks
{ref: bounds[1], tasks: tasks[:1]}, // first task
{ref: bounds[2], tasks: tasks[1:]}, // second task
}
results := partitionTasksByBlock(tasks, bounds)
require.Equal(t, expected, results)
})
}

func TestPartitionRequest(t *testing.T) {
Expand Down
8 changes: 1 addition & 7 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/queue"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
)

Expand Down Expand Up @@ -92,7 +90,6 @@ func (w *worker) running(_ context.Context) error {
w.metrics.tasksDequeued.WithLabelValues(w.id, labelSuccess).Add(float64(len(items)))

tasks := make([]Task, 0, len(items))
var mb v1.MultiFingerprintBounds
for _, item := range items {
task, ok := item.(Task)
if !ok {
Expand All @@ -104,13 +101,10 @@ func (w *worker) running(_ context.Context) error {
w.metrics.queueDuration.WithLabelValues(w.id).Observe(time.Since(task.enqueueTime).Seconds())
FromContext(task.ctx).AddQueueTime(time.Since(task.enqueueTime))
tasks = append(tasks, task)

first, last := getFirstLast(task.series)
mb = mb.Union(v1.NewBounds(model.Fingerprint(first.Fingerprint), model.Fingerprint(last.Fingerprint)))
}

start = time.Now()
err = p.runWithBounds(taskCtx, tasks, mb)
err = p.processTasks(taskCtx, tasks)

if err != nil {
w.metrics.processDuration.WithLabelValues(w.id, labelFailure).Observe(time.Since(start).Seconds())
Expand Down

0 comments on commit 3b0502d

Please sign in to comment.