diff --git a/integration/bloom_building_test.go b/integration/bloom_building_test.go new file mode 100644 index 0000000000000..3febb0d8b1f57 --- /dev/null +++ b/integration/bloom_building_test.go @@ -0,0 +1,252 @@ +//go:build integration + +package integration + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/flagext" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/integration/client" + "github.com/grafana/loki/v3/integration/cluster" + "github.com/grafana/loki/v3/pkg/storage" + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" + "github.com/grafana/loki/v3/pkg/storage/chunk/cache" + "github.com/grafana/loki/v3/pkg/storage/chunk/client/local" + "github.com/grafana/loki/v3/pkg/storage/config" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" + bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config" + "github.com/grafana/loki/v3/pkg/storage/types" + "github.com/grafana/loki/v3/pkg/util/mempool" +) + +func TestBloomBuilding(t *testing.T) { + const ( + nSeries = 10 //1000 + nLogsPerSeries = 50 + nBuilders = 5 + ) + + clu := cluster.New(nil, cluster.SchemaWithTSDB, func(c *cluster.Cluster) { + c.SetSchemaVer("v13") + }) + defer func() { + require.NoError(t, clu.Cleanup()) + }() + + // First run distributor and ingester and write some data across many series. + tDistributor := clu.AddComponent( + "distributor", + "-target=distributor", + ) + tIngester := clu.AddComponent( + "ingester", + "-target=ingester", + "-ingester.flush-on-shutdown=true", + ) + require.NoError(t, clu.Run()) + + tenantID := "fake" + now := time.Now() + cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL()) + cliIngester := client.New(tenantID, "", tIngester.HTTPURL()) + cliIngester.Now = now + + // We now ingest some logs across many series. + series := make([]labels.Labels, 0, nSeries) + for i := 0; i < nSeries; i++ { + lbs := labels.FromStrings("job", fmt.Sprintf("job-%d", i)) + series = append(series, lbs) + + for j := 0; j < nLogsPerSeries; j++ { + require.NoError(t, cliDistributor.PushLogLine(fmt.Sprintf("log line %d", j), now, nil, lbs.Map())) + } + } + + // restart ingester which should flush the chunks and index + require.NoError(t, tIngester.Restart()) + + // Start compactor and wait for compaction to finish. + tCompactor := clu.AddComponent( + "compactor", + "-target=compactor", + "-compactor.compaction-interval=10m", + "-compactor.run-once=true", + ) + require.NoError(t, clu.Run()) + + // Wait for compaction to finish. + time.Sleep(5 * time.Second) + + // Now create the bloom planner and builders + tBloomPlanner := clu.AddComponent( + "bloom-planner", + "-target=bloom-planner", + "-bloom-build.enabled=true", + "-bloom-build.enable=true", + "-bloom-build.planner.interval=10m", + "-bloom-build.planner.min-table-offset=0", + ) + require.NoError(t, clu.Run()) + + // Add several builders + builders := make([]*cluster.Component, 0, nBuilders) + for i := 0; i < nBuilders; i++ { + builder := clu.AddComponent( + "bloom-builder", + "-target=bloom-builder", + "-bloom-build.enabled=true", + "-bloom-build.enable=true", + "-bloom-build.builder.planner-address="+tBloomPlanner.GRPCURL(), + ) + builders = append(builders, builder) + } + require.NoError(t, clu.Run()) + + // Wait for bloom build to finish + time.Sleep(5 * time.Second) + + // Create bloom client to fetch metas and blocks. + bloomStore := createBloomStore(t, tBloomPlanner.ClusterSharedPath()) + + // Check that all series pushed are present in the metas and blocks. + checkSeriesInBlooms(t, now, tenantID, bloomStore, series) + + // Push some more logs so TSDBs need to be updated. + for i := 0; i < nSeries; i++ { + lbs := labels.FromStrings("job", fmt.Sprintf("job-new-%d", i)) + series = append(series, lbs) + + for j := 0; j < nLogsPerSeries; j++ { + require.NoError(t, cliDistributor.PushLogLine(fmt.Sprintf("log line %d", j), now, nil, lbs.Map())) + } + } + + // restart ingester which should flush the chunks and index + require.NoError(t, tIngester.Restart()) + + // Restart compactor and wait for compaction to finish so TSDBs are updated. + require.NoError(t, tCompactor.Restart()) + time.Sleep(5 * time.Second) + + // Restart bloom planner to trigger bloom build + require.NoError(t, tBloomPlanner.Restart()) + + // TODO(salvacorts): Implement retry on builder so we don't need to restart them. + for _, tBloomBuilder := range builders { + tBloomBuilder.AddFlags("-bloom-build.builder.planner-address=" + tBloomPlanner.GRPCURL()) + require.NoError(t, tBloomBuilder.Restart()) + } + + // Wait for bloom build to finish + time.Sleep(5 * time.Second) + + // Check that all series (both previous and new ones) pushed are present in the metas and blocks. + // This check ensures up to 1 meta per series, which tests deletion of old metas. + checkSeriesInBlooms(t, now, tenantID, bloomStore, series) +} + +func createBloomStore(t *testing.T, sharedPath string) *bloomshipper.BloomStore { + logger := log.NewNopLogger() + //logger := log.NewLogfmtLogger(os.Stdout) + + schemaCfg := config.SchemaConfig{ + Configs: []config.PeriodConfig{ + { + From: parseDayTime("2023-09-01"), + IndexTables: config.IndexPeriodicTableConfig{ + PeriodicTableConfig: config.PeriodicTableConfig{ + Prefix: "index_tsdb_", + Period: 24 * time.Hour, + }, + }, + IndexType: types.TSDBType, + ObjectType: types.StorageTypeFileSystem, + Schema: "v13", + RowShards: 16, + }, + }, + } + storageCfg := storage.Config{ + BloomShipperConfig: bloomshipperconfig.Config{ + WorkingDirectory: []string{sharedPath + "/bloom-store-test"}, + DownloadParallelism: 1, + BlocksCache: bloomshipperconfig.BlocksCacheConfig{ + SoftLimit: flagext.Bytes(10 << 20), + HardLimit: flagext.Bytes(20 << 20), + TTL: time.Hour, + }, + }, + FSConfig: local.FSConfig{ + Directory: sharedPath + "/fs-store-1", + }, + } + + reg := prometheus.NewPedanticRegistry() + metasCache := cache.NewNoopCache() + blocksCache := bloomshipper.NewFsBlocksCache(storageCfg.BloomShipperConfig.BlocksCache, reg, logger) + + store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger) + require.NoError(t, err) + + return store +} + +func checkSeriesInBlooms( + t *testing.T, + now time.Time, + tenantID string, + bloomStore *bloomshipper.BloomStore, + series []labels.Labels, +) { + for _, lbs := range series { + seriesFP := model.Fingerprint(lbs.Hash()) + + metas, err := bloomStore.FetchMetas(context.Background(), bloomshipper.MetaSearchParams{ + TenantID: tenantID, + Interval: bloomshipper.NewInterval(model.TimeFromUnix(now.Add(-24*time.Hour).Unix()), model.TimeFromUnix(now.Unix())), + Keyspace: v1.NewBounds(seriesFP, seriesFP), + }) + require.NoError(t, err) + + // Only one meta should be present. + require.Len(t, metas, 1) + + var relevantBlocks []bloomshipper.BlockRef + for _, block := range metas[0].Blocks { + if block.Cmp(uint64(seriesFP)) != v1.Overlap { + continue + } + relevantBlocks = append(relevantBlocks, block) + } + + // Only one block should be relevant. + require.Len(t, relevantBlocks, 1) + + queriers, err := bloomStore.FetchBlocks(context.Background(), relevantBlocks) + require.NoError(t, err) + require.Len(t, queriers, 1) + querier := queriers[0] + + require.NoError(t, querier.Seek(seriesFP)) + require.Equal(t, seriesFP, querier.At().Series.Fingerprint) + } +} + +func parseDayTime(s string) config.DayTime { + t, err := time.Parse("2006-01-02", s) + if err != nil { + panic(err) + } + return config.DayTime{ + Time: model.TimeFromUnix(t.Unix()), + } +} diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index 52ef9e023f4f4..3a5638ab46654 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -11,9 +11,12 @@ import ( "github.com/go-kit/log/level" "github.com/google/uuid" "github.com/grafana/dskit/services" + "github.com/grafana/dskit/user" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/grafana/loki/v3/pkg/bloombuild/common" "github.com/grafana/loki/v3/pkg/bloombuild/protos" @@ -84,16 +87,25 @@ func (b *Builder) starting(_ context.Context) error { } func (b *Builder) stopping(_ error) error { + defer b.metrics.running.Set(0) + if b.client != nil { + // The gRPC server we use from dskit expects the orgID to be injected into the context when auth is enabled + // We won't actually use the orgID anywhere in this service, but we need to inject it to satisfy the server. + ctx, err := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), "fake")) + if err != nil { + level.Error(b.logger).Log("msg", "failed to inject orgID into context", "err", err) + return nil + } + req := &protos.NotifyBuilderShutdownRequest{ BuilderID: b.ID, } - if _, err := b.client.NotifyBuilderShutdown(context.Background(), req); err != nil { + if _, err := b.client.NotifyBuilderShutdown(ctx, req); err != nil { level.Error(b.logger).Log("msg", "failed to notify planner about builder shutdown", "err", err) } } - b.metrics.running.Set(0) return nil } @@ -111,6 +123,13 @@ func (b *Builder) running(ctx context.Context) error { b.client = protos.NewPlannerForBuilderClient(conn) + // The gRPC server we use from dskit expects the orgID to be injected into the context when auth is enabled + // We won't actually use the orgID anywhere in this service, but we need to inject it to satisfy the server. + ctx, err = user.InjectIntoGRPCRequest(user.InjectOrgID(ctx, "fake")) + if err != nil { + return fmt.Errorf("failed to inject orgID into context: %w", err) + } + c, err := b.client.BuilderLoop(ctx) if err != nil { return fmt.Errorf("failed to start builder loop: %w", err) @@ -135,7 +154,7 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro // will be canceled and the loop will exit. protoTask, err := c.Recv() if err != nil { - if errors.Is(c.Context().Err(), context.Canceled) { + if status.Code(err) == codes.Canceled { level.Debug(b.logger).Log("msg", "builder loop context canceled") return nil } diff --git a/pkg/bloombuild/planner/metrics.go b/pkg/bloombuild/planner/metrics.go index b97195eac7a3a..485bf81ddd761 100644 --- a/pkg/bloombuild/planner/metrics.go +++ b/pkg/bloombuild/planner/metrics.go @@ -28,9 +28,10 @@ type Metrics struct { taskLost prometheus.Counter tasksFailed prometheus.Counter - buildStarted prometheus.Counter - buildCompleted *prometheus.CounterVec - buildTime *prometheus.HistogramVec + buildStarted prometheus.Counter + buildCompleted *prometheus.CounterVec + buildTime *prometheus.HistogramVec + buildLastSuccess prometheus.Gauge blocksDeleted prometheus.Counter metasDeleted prometheus.Counter @@ -111,6 +112,12 @@ func NewMetrics( Help: "Time spent during a builds cycle.", Buckets: prometheus.DefBuckets, }, []string{"status"}), + buildLastSuccess: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "build_last_successful_run_timestamp_seconds", + Help: "Unix timestamp of the last successful build cycle.", + }), blocksDeleted: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index dd44c545ff36a..510ed8f96cece 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -188,6 +188,10 @@ func (p *Planner) runOne(ctx context.Context) error { defer func() { p.metrics.buildCompleted.WithLabelValues(status).Inc() p.metrics.buildTime.WithLabelValues(status).Observe(time.Since(start).Seconds()) + + if status == statusSuccess { + p.metrics.buildLastSuccess.SetToCurrentTime() + } }() p.metrics.buildStarted.Inc() @@ -219,6 +223,7 @@ func (p *Planner) runOne(ctx context.Context) error { level.Error(logger).Log("msg", "error computing tasks", "err", err) continue } + level.Debug(logger).Log("msg", "computed tasks", "tasks", len(tasks), "existingMetas", len(existingMetas)) var tenantTableEnqueuedTasks int resultsCh := make(chan *protos.TaskResult, len(tasks)) @@ -253,6 +258,11 @@ func (p *Planner) runOne(ctx context.Context) error { // Create a pool of workers to process table-tenant tuples. var wg sync.WaitGroup for tt, results := range tasksResultForTenantTable { + if results.tasksToWait == 0 { + // No tasks enqueued for this tenant-table tuple, skip processing + continue + } + wg.Add(1) go func(table config.DayTable, tenant string, results tenantTableTaskResults) { defer wg.Done() @@ -306,7 +316,6 @@ func (p *Planner) computeTasks( // Filter only the metas that overlap in the ownership range metasInBounds := bloomshipper.FilterMetasOverlappingBounds(metas, ownershipRange) - level.Debug(logger).Log("msg", "found relevant metas", "metas", len(metasInBounds)) // Find gaps in the TSDBs for this tenant/table gaps, err := p.findOutdatedGaps(ctx, tenant, table, ownershipRange, metasInBounds, logger) @@ -314,6 +323,10 @@ func (p *Planner) computeTasks( level.Error(logger).Log("msg", "failed to find outdated gaps", "err", err) continue } + if len(gaps) == 0 { + level.Debug(logger).Log("msg", "no gaps found") + continue + } for _, gap := range gaps { tasks = append(tasks, protos.NewTask(table, tenant, ownershipRange, gap.tsdb, gap.gaps)) @@ -331,7 +344,7 @@ func (p *Planner) processTenantTaskResults( totalTasks int, resultsCh <-chan *protos.TaskResult, ) error { - logger := log.With(p.logger, table, table.Addr(), "tenant", tenant) + logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant) level.Debug(logger).Log("msg", "waiting for all tasks to be completed", "tasks", totalTasks) newMetas := make([]bloomshipper.Meta, 0, totalTasks) @@ -379,8 +392,12 @@ func (p *Planner) processTenantTaskResults( combined := append(originalMetas, newMetas...) outdated := outdatedMetas(combined) - level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated)) + if len(outdated) == 0 { + level.Debug(logger).Log("msg", "no outdated metas found") + return nil + } + level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated)) if err := p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, outdated); err != nil { return fmt.Errorf("failed to delete outdated metas: %w", err) } @@ -494,6 +511,7 @@ func (p *Planner) loadTenantWork( tenant := tenants.At() if !p.limits.BloomCreationEnabled(tenant) { + level.Debug(p.logger).Log("msg", "bloom creation disabled for tenant", "tenant", tenant) continue } @@ -730,7 +748,7 @@ func (p *Planner) NotifyBuilderShutdown( req *protos.NotifyBuilderShutdownRequest, ) (*protos.NotifyBuilderShutdownResponse, error) { level.Debug(p.logger).Log("msg", "builder shutdown", "builder", req.BuilderID) - p.tasksQueue.UnregisterConsumerConnection(req.GetBuilderID()) + p.tasksQueue.NotifyConsumerShutdown(req.GetBuilderID()) return &protos.NotifyBuilderShutdownResponse{}, nil } @@ -820,7 +838,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer level.Debug(logger).Log( "msg", "task completed", "duration", time.Since(task.queueTime).Seconds(), - "retries", task.timesEnqueued.Load(), + "retries", task.timesEnqueued.Load()-1, // -1 because the first enqueue is not a retry ) p.removePendingTask(task) p.metrics.tenantTasksCompleted.WithLabelValues(task.Tenant).Inc() diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index e405f5d762f0b..204cecd0ce3ad 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -40,6 +40,7 @@ import ( "github.com/grafana/loki/v3/pkg/analytics" "github.com/grafana/loki/v3/pkg/bloombuild/builder" "github.com/grafana/loki/v3/pkg/bloombuild/planner" + bloomprotos "github.com/grafana/loki/v3/pkg/bloombuild/protos" "github.com/grafana/loki/v3/pkg/bloomgateway" "github.com/grafana/loki/v3/pkg/compactor" compactorclient "github.com/grafana/loki/v3/pkg/compactor/client" @@ -714,9 +715,9 @@ func (t *Loki) initStore() (services.Service, error) { } func (t *Loki) initBloomStore() (services.Service, error) { - // BloomStore is a dependency of IndexGateway, even when the BloomGateway is not enabled. - // Do not instantiate store and do not create a service. - if !t.Cfg.BloomGateway.Enabled { + // BloomStore is a dependency of IndexGateway and Bloom Planner & Builder. + // Do not instantiate store and do not create a service if neither ar enabled. + if !t.Cfg.BloomGateway.Enabled && !t.Cfg.BloomBuild.Enabled { return nil, nil } @@ -1584,7 +1585,7 @@ func (t *Loki) initBloomPlanner() (services.Service, error) { logger := log.With(util_log.Logger, "component", "bloom-planner") - return planner.New( + p, err := planner.New( t.Cfg.BloomBuild.Planner, t.Overrides, t.Cfg.SchemaConfig, @@ -1594,6 +1595,12 @@ func (t *Loki) initBloomPlanner() (services.Service, error) { logger, prometheus.DefaultRegisterer, ) + if err != nil { + return nil, err + } + + bloomprotos.RegisterPlannerForBuilderServer(t.Server.GRPC, p) + return p, nil } func (t *Loki) initBloomBuilder() (services.Service, error) { @@ -1601,7 +1608,7 @@ func (t *Loki) initBloomBuilder() (services.Service, error) { return nil, nil } - logger := log.With(util_log.Logger, "component", "bloom-worker") + logger := log.With(util_log.Logger, "component", "bloom-builder") return builder.New( t.Cfg.BloomBuild.Builder,