Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(blooms): Fix and improve build metrics #13360

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion pkg/bloombuild/planner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Metrics struct {
tasksRequeued prometheus.Counter
taskLost prometheus.Counter

planningTime prometheus.Histogram
buildStarted prometheus.Counter
buildCompleted *prometheus.CounterVec
buildTime *prometheus.HistogramVec
Expand Down Expand Up @@ -86,6 +87,14 @@ func NewMetrics(
Help: "Total number of tasks lost due to not being picked up by a builder and failed to be requeued.",
}),

planningTime: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "planning_time_seconds",
Help: "Time spent during on planning during a build cycle.",
salvacorts marked this conversation as resolved.
Show resolved Hide resolved
// 1s --> 1h (steps of 1 minute)
Buckets: prometheus.LinearBuckets(1, 60, 60),
}),
buildStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Expand All @@ -103,7 +112,13 @@ func NewMetrics(
Subsystem: metricsSubsystem,
Name: "build_time_seconds",
Help: "Time spent during a builds cycle.",
Buckets: prometheus.DefBuckets,
// Buckets in seconds:
Buckets: append(
// 1s --> 1h (steps of 10 minutes)
prometheus.LinearBuckets(1, 600, 6),
// 1h --> 24h (steps of 1 hour)
prometheus.LinearBuckets(3600, 3600, 24)...,
),
}, []string{"status"}),
buildLastSuccess: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Expand Down
62 changes: 46 additions & 16 deletions pkg/bloombuild/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
Expand Down Expand Up @@ -251,11 +252,18 @@ func (p *Planner) runOne(ctx context.Context) error {
}
}

level.Debug(p.logger).Log("msg", "planning completed", "tasks", totalTasks)
p.metrics.planningTime.Observe(time.Since(start).Seconds())
level.Debug(p.logger).Log(
"msg", "planning completed",
"tenantTables", len(tasksResultForTenantTable),
"tasks", totalTasks,
"time", time.Since(start).Seconds(),
)

// Create a goroutine to process the results for each table tenant tuple
// TODO(salvacorts): This may end up creating too many goroutines.
// Create a pool of workers to process table-tenant tuples.
var tasksSucceed atomic.Int64
var wg sync.WaitGroup
for tt, results := range tasksResultForTenantTable {
if results.tasksToWait == 0 {
Expand All @@ -267,21 +275,40 @@ func (p *Planner) runOne(ctx context.Context) error {
go func(table config.DayTable, tenant string, results tenantTableTaskResults) {
defer wg.Done()

if err := p.processTenantTaskResults(
logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant)

nSucceed, err := p.processTenantTaskResults(
ctx, table, tenant,
results.originalMetas, results.tasksToWait, results.resultsCh,
); err != nil {
level.Error(p.logger).Log("msg", "failed to process tenant task results", "err", err)
)
if err != nil {
level.Error(logger).Log("msg", "failed to process tenant task results", "err", err)
}

if nSucceed != results.tasksToWait {
level.Error(logger).Log(
"msg", "not all tasks succeeded for tenant table",
"tasks", results.tasksToWait,
"tasksSucceed", nSucceed,
"tasksFailed", results.tasksToWait-nSucceed,
)
}
tasksSucceed.Add(int64(nSucceed))
}(tt.table, tt.tenant, results)
}

level.Debug(p.logger).Log("msg", "waiting for all tasks to be completed", "tasks", totalTasks, "tenantTables", len(tasksResultForTenantTable))
level.Debug(p.logger).Log(
"msg", "waiting for all tasks to be completed",
"tenantTables", len(tasksResultForTenantTable),
"tasks", totalTasks,
)
wg.Wait()

status = statusSuccess
level.Info(p.logger).Log(
"msg", "bloom build iteration completed",
"tasks", totalTasks,
"tasksSucceed", tasksSucceed.Load(),
"duration", time.Since(start).Seconds(),
)
return nil
Expand Down Expand Up @@ -324,7 +351,6 @@ func (p *Planner) computeTasks(
continue
}
if len(gaps) == 0 {
level.Debug(logger).Log("msg", "no gaps found")
continue
}

Expand All @@ -343,28 +369,31 @@ func (p *Planner) processTenantTaskResults(
originalMetas []bloomshipper.Meta,
totalTasks int,
resultsCh <-chan *protos.TaskResult,
) error {
) (int, error) {
logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant)
level.Debug(logger).Log("msg", "waiting for all tasks to be completed", "tasks", totalTasks)

var tasksSucceed int
newMetas := make([]bloomshipper.Meta, 0, totalTasks)
for i := 0; i < totalTasks; i++ {
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) {
level.Error(logger).Log("msg", "planner context done with error", "err", err)
return err
return tasksSucceed, err
}

// No error or context canceled, just return
level.Debug(logger).Log("msg", "context done while waiting for task results")
return nil
return tasksSucceed, nil
case result := <-resultsCh:
if result == nil {
p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusFailure).Inc()
level.Error(logger).Log("msg", "received nil task result")
continue
}
if result.Error != nil {
p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusFailure).Inc()
level.Error(logger).Log(
"msg", "task failed",
"err", result.Error,
Expand All @@ -373,36 +402,39 @@ func (p *Planner) processTenantTaskResults(
continue
}

p.metrics.tenantTasksCompleted.WithLabelValues(tenant, statusSuccess).Inc()
newMetas = append(newMetas, result.CreatedMetas...)
tasksSucceed++
}
}

level.Debug(logger).Log(
"msg", "all tasks completed",
"msg", "all tasks completed for tenant table",
"tasks", totalTasks,
"tasksSucceed", tasksSucceed,
"originalMetas", len(originalMetas),
"newMetas", len(newMetas),
)

if len(newMetas) == 0 {
// No new metas were created, nothing to delete
// Note: this would only happen if all tasks failed
return nil
return tasksSucceed, nil
}

combined := append(originalMetas, newMetas...)
outdated := outdatedMetas(combined)
if len(outdated) == 0 {
level.Debug(logger).Log("msg", "no outdated metas found")
return nil
return tasksSucceed, nil
}

level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated))
if err := p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, outdated); err != nil {
return fmt.Errorf("failed to delete outdated metas: %w", err)
return 0, fmt.Errorf("failed to delete outdated metas: %w", err)
}

return nil
return tasksSucceed, nil
}

func (p *Planner) deleteOutdatedMetasAndBlocks(
Expand Down Expand Up @@ -800,7 +832,6 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
if err != nil {
maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant)
if maxRetries > 0 && int(task.timesEnqueued.Load()) >= maxRetries {
p.metrics.tenantTasksCompleted.WithLabelValues(task.Tenant, statusFailure).Inc()
p.removePendingTask(task)
level.Error(logger).Log(
"msg", "task failed after max retries",
Expand Down Expand Up @@ -842,7 +873,6 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
"retries", task.timesEnqueued.Load()-1, // -1 because the first enqueue is not a retry
)
p.removePendingTask(task)
p.metrics.tenantTasksCompleted.WithLabelValues(task.Tenant, statusSuccess).Inc()

// Send the result back to the task. The channel is buffered, so this should not block.
task.resultsChannel <- result
Expand Down
15 changes: 11 additions & 4 deletions pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,9 +624,10 @@ func Test_processTenantTaskResults(t *testing.T) {
for _, tc := range []struct {
name string

originalMetas []bloomshipper.Meta
taskResults []*protos.TaskResult
expectedMetas []bloomshipper.Meta
originalMetas []bloomshipper.Meta
taskResults []*protos.TaskResult
expectedMetas []bloomshipper.Meta
expectedTasksSucceed int
}{
{
name: "errors",
Expand All @@ -649,6 +650,7 @@ func Test_processTenantTaskResults(t *testing.T) {
genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}),
},
expectedTasksSucceed: 0,
},
{
name: "no new metas",
Expand All @@ -669,6 +671,7 @@ func Test_processTenantTaskResults(t *testing.T) {
genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}),
},
expectedTasksSucceed: 2,
},
{
name: "no original metas",
Expand All @@ -690,6 +693,7 @@ func Test_processTenantTaskResults(t *testing.T) {
genMeta(0, 10, []int{0}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
genMeta(10, 20, []int{0}, []bloomshipper.BlockRef{genBlockRef(10, 20)}),
},
expectedTasksSucceed: 2,
},
{
name: "single meta covers all original",
Expand All @@ -708,6 +712,7 @@ func Test_processTenantTaskResults(t *testing.T) {
expectedMetas: []bloomshipper.Meta{
genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
},
expectedTasksSucceed: 1,
},
{
name: "multi version ordering",
Expand All @@ -727,6 +732,7 @@ func Test_processTenantTaskResults(t *testing.T) {
genMeta(0, 10, []int{1}, []bloomshipper.BlockRef{genBlockRef(0, 10)}),
genMeta(8, 10, []int{2}, []bloomshipper.BlockRef{genBlockRef(8, 10)}),
},
expectedTasksSucceed: 1,
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand Down Expand Up @@ -755,7 +761,7 @@ func Test_processTenantTaskResults(t *testing.T) {
go func() {
defer wg.Done()

err = planner.processTenantTaskResults(
completed, err := planner.processTenantTaskResults(
ctx,
testTable,
"fakeTenant",
Expand All @@ -764,6 +770,7 @@ func Test_processTenantTaskResults(t *testing.T) {
resultsCh,
)
require.NoError(t, err)
require.Equal(t, tc.expectedTasksSucceed, completed)
}()

for _, taskResult := range tc.taskResults {
Expand Down
Loading