diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 5c3dba3d15de2..446a48adcd979 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2248,6 +2248,16 @@ bloom_shipper: # Working directory to store downloaded Bloom Blocks. # CLI flag: -bloom.shipper.working-directory [working_directory: | default = "bloom-shipper"] + + blocks_downloading_queue: + # The count of parallel workers that download Bloom Blocks. + # CLI flag: -bloom.shipper.blocks-downloading-queue.workers-count + [workers_count: | default = 100] + + # Maximum number of task in queue per tenant per bloom-gateway. Enqueuing + # the tasks above this limit will fail an error. + # CLI flag: -bloom.shipper.blocks-downloading-queue.max_tasks_enqueued_per_tenant + [max_tasks_enqueued_per_tenant: | default = 10000] ``` ### chunk_store_config @@ -2990,6 +3000,10 @@ shard_streams: # CLI flag: -bloom-compactor.false-positive-rate [bloom_false_positive_rate: | default = 0.01] +# Maximum number of blocks will be downloaded in parallel by the Bloom Gateway. +# CLI flag: -bloom-gateway.blocks-downloading-parallelism +[bloom_gateway_blocks_downloading_parallelism: | default = 50] + # Allow user to send structured metadata in push payload. # CLI flag: -validation.allow-structured-metadata [allow_structured_metadata: | default = false] diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index b0b01c34dbaf6..425d6713e92f9 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -187,7 +187,7 @@ type Gateway struct { } // New returns a new instance of the Bloom Gateway. -func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, shardingStrategy ShardingStrategy, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) { +func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, overrides Limits, shardingStrategy ShardingStrategy, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) { g := &Gateway{ cfg: cfg, logger: logger, @@ -205,7 +205,7 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, s return nil, err } - bloomShipper, err := bloomshipper.NewShipper(client, storageCfg.BloomShipperConfig, logger) + bloomShipper, err := bloomshipper.NewShipper(client, storageCfg.BloomShipperConfig, overrides, logger, reg) if err != nil { return nil, err } diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index c0d9ffdfae230..0b6a207362ac6 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -82,7 +82,7 @@ func TestBloomGateway_StartStopService(t *testing.T) { }, } - gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg) + gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg) require.NoError(t, err) err = services.StartAndAwaitRunning(context.Background(), gw) @@ -142,7 +142,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { t.Run("returns unfiltered chunk refs if no filters provided", func(t *testing.T) { reg := prometheus.NewRegistry() - gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg) + gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg) require.NoError(t, err) err = services.StartAndAwaitRunning(context.Background(), gw) @@ -188,7 +188,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { t.Run("returns error if chunk refs do not belong to tenant", func(t *testing.T) { reg := prometheus.NewRegistry() - gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg) + gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg) require.NoError(t, err) ts, _ := time.Parse("2006-01-02 15:04", "2023-10-03 10:00") @@ -212,7 +212,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { t.Run("gateway tracks active users", func(t *testing.T) { reg := prometheus.NewRegistry() - gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg) + gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg) require.NoError(t, err) err = services.StartAndAwaitRunning(context.Background(), gw) @@ -248,3 +248,21 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { require.ElementsMatch(t, tenants, gw.activeUsers.ActiveUsers()) }) } + +type fakeLimits struct { +} + +func (f fakeLimits) BloomGatewayShardSize(_ string) int { + //TODO implement me + panic("implement me") +} + +func (f fakeLimits) BloomGatewayEnabled(_ string) bool { + //TODO implement me + panic("implement me") +} + +func (f fakeLimits) BloomGatewayBlocksDownloadingParallelism(_ string) int { + //TODO implement me + panic("implement me") +} diff --git a/pkg/bloomgateway/sharding.go b/pkg/bloomgateway/sharding.go index 4bd288ccfe43b..09926284b3794 100644 --- a/pkg/bloomgateway/sharding.go +++ b/pkg/bloomgateway/sharding.go @@ -38,6 +38,7 @@ var ( type Limits interface { BloomGatewayShardSize(tenantID string) int BloomGatewayEnabled(tenantID string) bool + BloomGatewayBlocksDownloadingParallelism(userID string) int } type ShardingStrategy interface { diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index e0e8ab4d1f88d..bf450f852be5a 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1263,7 +1263,7 @@ func (t *Loki) initBloomGateway() (services.Service, error) { shuffleSharding := bloomgateway.NewShuffleShardingStrategy(t.bloomGatewayRingManager.Ring, t.bloomGatewayRingManager.RingLifecycler, t.Overrides, logger) - gateway, err := bloomgateway.New(t.Cfg.BloomGateway, t.Cfg.SchemaConfig, t.Cfg.StorageConfig, shuffleSharding, t.clientMetrics, logger, prometheus.DefaultRegisterer) + gateway, err := bloomgateway.New(t.Cfg.BloomGateway, t.Cfg.SchemaConfig, t.Cfg.StorageConfig, t.Overrides, shuffleSharding, t.clientMetrics, logger, prometheus.DefaultRegisterer) if err != nil { return nil, err } diff --git a/pkg/storage/bloom/v1/block_writer.go b/pkg/storage/bloom/v1/block_writer.go index 317d1e598414a..99ab65ef9cd40 100644 --- a/pkg/storage/bloom/v1/block_writer.go +++ b/pkg/storage/bloom/v1/block_writer.go @@ -12,8 +12,8 @@ import ( ) const ( - bloomFileName = "bloom" - seriesFileName = "series" + BloomFileName = "bloom" + SeriesFileName = "series" ) type BlockWriter interface { @@ -66,12 +66,12 @@ func (b *DirectoryBlockWriter) Init() error { return errors.Wrap(err, "creating bloom block dir") } - b.index, err = os.Create(filepath.Join(b.dir, seriesFileName)) + b.index, err = os.Create(filepath.Join(b.dir, SeriesFileName)) if err != nil { return errors.Wrap(err, "creating series file") } - b.blooms, err = os.Create(filepath.Join(b.dir, bloomFileName)) + b.blooms, err = os.Create(filepath.Join(b.dir, BloomFileName)) if err != nil { return errors.Wrap(err, "creating bloom file") } diff --git a/pkg/storage/bloom/v1/reader.go b/pkg/storage/bloom/v1/reader.go index e4de9609b9082..d5c70a2b64d83 100644 --- a/pkg/storage/bloom/v1/reader.go +++ b/pkg/storage/bloom/v1/reader.go @@ -49,12 +49,12 @@ func NewDirectoryBlockReader(dir string) *DirectoryBlockReader { func (r *DirectoryBlockReader) Init() error { if !r.initialized { var err error - r.index, err = os.Open(filepath.Join(r.dir, seriesFileName)) + r.index, err = os.Open(filepath.Join(r.dir, SeriesFileName)) if err != nil { return errors.Wrap(err, "opening series file") } - r.blooms, err = os.Open(filepath.Join(r.dir, bloomFileName)) + r.blooms, err = os.Open(filepath.Join(r.dir, BloomFileName)) if err != nil { return errors.Wrap(err, "opening bloom file") } diff --git a/pkg/storage/stores/shipper/bloomshipper/block_downloader.go b/pkg/storage/stores/shipper/bloomshipper/block_downloader.go new file mode 100644 index 0000000000000..b6721db88640e --- /dev/null +++ b/pkg/storage/stores/shipper/bloomshipper/block_downloader.go @@ -0,0 +1,230 @@ +package bloomshipper + +import ( + "context" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/loki/pkg/queue" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" + "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/constants" +) + +type blockDownloader struct { + logger log.Logger + + workingDirectory string + queueMetrics *queue.Metrics + queue *queue.RequestQueue + blockClient BlockClient + limits Limits + activeUsersService *util.ActiveUsersCleanupService + + ctx context.Context + manager *services.Manager + onWorkerStopCallback func() +} + +func newBlockDownloader(config config.Config, blockClient BlockClient, limits Limits, logger log.Logger, reg prometheus.Registerer) (*blockDownloader, error) { + queueMetrics := queue.NewMetrics(reg, constants.Loki, "bloom_blocks_downloader") + //add cleanup service + downloadingQueue := queue.NewRequestQueue(config.BlocksDownloadingQueue.MaxTasksEnqueuedPerTenant, time.Minute, queueMetrics) + activeUsersService := util.NewActiveUsersCleanupWithDefaultValues(queueMetrics.Cleanup) + + ctx := context.Background() + manager, err := services.NewManager(downloadingQueue, activeUsersService) + if err != nil { + return nil, fmt.Errorf("error creating service manager: %w", err) + } + err = services.StartManagerAndAwaitHealthy(ctx, manager) + if err != nil { + return nil, fmt.Errorf("error starting service manager: %w", err) + } + + b := &blockDownloader{ + ctx: ctx, + logger: logger, + workingDirectory: config.WorkingDirectory, + queueMetrics: queueMetrics, + queue: downloadingQueue, + blockClient: blockClient, + activeUsersService: activeUsersService, + limits: limits, + manager: manager, + onWorkerStopCallback: onWorkerStopNoopCallback, + } + + for i := 0; i < config.BlocksDownloadingQueue.WorkersCount; i++ { + go b.serveDownloadingTasks(fmt.Sprintf("worker-%d", i)) + } + return b, nil +} + +type BlockDownloadingTask struct { + ctx context.Context + block BlockRef + // ErrCh is a send-only channel to write an error to + ErrCh chan<- error + // ResultsCh is a send-only channel to return the block querier for the downloaded block + ResultsCh chan<- blockWithQuerier +} + +func NewBlockDownloadingTask(ctx context.Context, block BlockRef, resCh chan<- blockWithQuerier, errCh chan<- error) *BlockDownloadingTask { + return &BlockDownloadingTask{ + ctx: ctx, + block: block, + ErrCh: errCh, + ResultsCh: resCh, + } +} + +// noop implementation +var onWorkerStopNoopCallback = func() {} + +func (d *blockDownloader) serveDownloadingTasks(workerID string) { + logger := log.With(d.logger, "worker", workerID) + level.Debug(logger).Log("msg", "starting worker") + + d.queue.RegisterConsumerConnection(workerID) + defer d.queue.UnregisterConsumerConnection(workerID) + //this callback is used only in the tests to assert that worker is stopped + defer d.onWorkerStopCallback() + + idx := queue.StartIndexWithLocalQueue + + for { + item, newIdx, err := d.queue.Dequeue(d.ctx, idx, workerID) + if err != nil { + if !errors.Is(err, queue.ErrStopped) && !errors.Is(err, context.Canceled) { + level.Error(logger).Log("msg", "failed to dequeue task", "err", err) + continue + } + level.Info(logger).Log("msg", "stopping worker") + return + } + task, ok := item.(*BlockDownloadingTask) + if !ok { + level.Error(logger).Log("msg", "failed to cast to BlockDownloadingTask", "item", fmt.Sprintf("%+v", item), "type", fmt.Sprintf("%T", item)) + continue + } + + idx = newIdx + blockPath := task.block.BlockPath + //todo add cache before downloading + level.Debug(logger).Log("msg", "start downloading the block", "block", blockPath) + block, err := d.blockClient.GetBlock(task.ctx, task.block) + if err != nil { + level.Error(logger).Log("msg", "error downloading the block", "block", blockPath, "err", err) + task.ErrCh <- fmt.Errorf("error downloading the block %s : %w", blockPath, err) + continue + } + directory, err := d.extractBlock(&block, time.Now()) + if err != nil { + level.Error(logger).Log("msg", "error extracting the block", "block", blockPath, "err", err) + task.ErrCh <- fmt.Errorf("error extracting the block %s : %w", blockPath, err) + continue + } + level.Debug(d.logger).Log("msg", "block has been downloaded and extracted", "block", task.block.BlockPath, "directory", directory) + blockQuerier := d.createBlockQuerier(directory) + task.ResultsCh <- blockWithQuerier{ + BlockRef: task.block, + BlockQuerier: blockQuerier, + } + } +} + +func (d *blockDownloader) downloadBlocks(ctx context.Context, tenantID string, references []BlockRef) (chan blockWithQuerier, chan error) { + d.activeUsersService.UpdateUserTimestamp(tenantID, time.Now()) + // we need to have errCh with size that can keep max count of errors to prevent the case when + // the queue worker reported the error to this channel before the current goroutine + // and this goroutine will go to the deadlock because it won't be able to report an error + // because nothing reads this channel at this moment. + errCh := make(chan error, len(references)) + blocksCh := make(chan blockWithQuerier, len(references)) + + downloadingParallelism := d.limits.BloomGatewayBlocksDownloadingParallelism(tenantID) + for _, reference := range references { + task := NewBlockDownloadingTask(ctx, reference, blocksCh, errCh) + level.Debug(d.logger).Log("msg", "enqueuing task to download block", "block", reference.BlockPath) + err := d.queue.Enqueue(tenantID, nil, task, downloadingParallelism, nil) + if err != nil { + errCh <- fmt.Errorf("error enquing downloading task for block %s : %w", reference.BlockPath, err) + return blocksCh, errCh + } + } + return blocksCh, errCh +} + +type blockWithQuerier struct { + BlockRef + *v1.BlockQuerier +} + +// extract the files into directory and returns absolute path to this directory. +func (d *blockDownloader) extractBlock(block *Block, ts time.Time) (string, error) { + workingDirectoryPath := filepath.Join(d.workingDirectory, block.BlockPath, strconv.FormatInt(ts.UnixMilli(), 10)) + err := os.MkdirAll(workingDirectoryPath, os.ModePerm) + if err != nil { + return "", fmt.Errorf("can not create directory to extract the block: %w", err) + } + archivePath, err := writeDataToTempFile(workingDirectoryPath, block) + if err != nil { + return "", fmt.Errorf("error writing data to temp file: %w", err) + } + defer func() { + os.Remove(archivePath) + // todo log err + }() + err = extractArchive(archivePath, workingDirectoryPath) + if err != nil { + return "", fmt.Errorf("error extracting archive: %w", err) + } + return workingDirectoryPath, nil +} + +func (d *blockDownloader) createBlockQuerier(directory string) *v1.BlockQuerier { + reader := v1.NewDirectoryBlockReader(directory) + block := v1.NewBlock(reader) + return v1.NewBlockQuerier(block) +} + +func (d *blockDownloader) stop() { + _ = services.StopManagerAndAwaitStopped(d.ctx, d.manager) +} + +func writeDataToTempFile(workingDirectoryPath string, block *Block) (string, error) { + defer block.Data.Close() + archivePath := filepath.Join(workingDirectoryPath, block.BlockPath[strings.LastIndex(block.BlockPath, delimiter)+1:]) + + archiveFile, err := os.Create(archivePath) + if err != nil { + return "", fmt.Errorf("error creating empty file to store the archiver: %w", err) + } + defer archiveFile.Close() + _, err = io.Copy(archiveFile, block.Data) + if err != nil { + return "", fmt.Errorf("error writing data to archive file: %w", err) + } + return archivePath, nil +} + +func extractArchive(archivePath string, workingDirectoryPath string) error { + file, err := os.Open(archivePath) + if err != nil { + return fmt.Errorf("error opening archive file %s: %w", file.Name(), err) + } + return v1.UnTarGz(workingDirectoryPath, file) +} diff --git a/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go b/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go new file mode 100644 index 0000000000000..b69d036d30e37 --- /dev/null +++ b/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go @@ -0,0 +1,168 @@ +package bloomshipper + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "path/filepath" + "strconv" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" + "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" + "github.com/grafana/loki/pkg/validation" +) + +func Test_blockDownloader_downloadBlocks(t *testing.T) { + overrides, err := validation.NewOverrides(validation.Limits{BloomGatewayBlocksDownloadingParallelism: 20}, nil) + require.NoError(t, err) + workingDirectory := t.TempDir() + + blockReferences, blockClient := createFakeBlocks(t, 20) + blockClient.responseDelay = 100 * time.Millisecond + workersCount := 10 + downloader, err := newBlockDownloader(config.Config{ + WorkingDirectory: workingDirectory, + BlocksDownloadingQueue: config.DownloadingQueueConfig{ + WorkersCount: workersCount, + MaxTasksEnqueuedPerTenant: 20, + }, + }, blockClient, overrides, log.NewNopLogger(), prometheus.DefaultRegisterer) + stoppedWorkersCount := atomic.NewInt32(0) + downloader.onWorkerStopCallback = func() { + stoppedWorkersCount.Inc() + } + require.NoError(t, err) + blocksCh, errorsCh := downloader.downloadBlocks(context.Background(), "fake", blockReferences) + downloadedBlocks := make(map[string]any, len(blockReferences)) + done := make(chan bool) + go func() { + for i := 0; i < 20; i++ { + block := <-blocksCh + downloadedBlocks[block.BlockPath] = nil + } + done <- true + }() + + select { + //20 blocks, 10 workers, fixed delay 100ms per block: the total downloading time must be ~200ms. + case <-time.After(2 * time.Second): + t.Fatalf("test must complete before the timeout") + case err := <-errorsCh: + require.NoError(t, err) + case <-done: + } + require.Len(t, downloadedBlocks, 20, "all 20 block must be downloaded") + + downloader.stop() + require.Eventuallyf(t, func() bool { + return stoppedWorkersCount.Load() == int32(workersCount) + }, 1*time.Second, 10*time.Millisecond, "expected all %d workers to be stopped", workersCount) +} + +// creates fake blocks and returns map[block-path]Block and mockBlockClient +func createFakeBlocks(t *testing.T, count int) ([]BlockRef, *mockBlockClient) { + mockData := make(map[string]Block, count) + refs := make([]BlockRef, 0, count) + for i := 0; i < count; i++ { + archive, _, _ := createBlockArchive(t) + block := Block{ + BlockRef: BlockRef{ + BlockPath: fmt.Sprintf("block-path-%d", i), + }, + Data: archive, + } + mockData[block.BlockPath] = block + refs = append(refs, block.BlockRef) + } + return refs, &mockBlockClient{mockData: mockData} +} + +type mockBlockClient struct { + responseDelay time.Duration + mockData map[string]Block +} + +func (m *mockBlockClient) GetBlock(_ context.Context, reference BlockRef) (Block, error) { + time.Sleep(m.responseDelay) + block, exists := m.mockData[reference.BlockPath] + if exists { + return block, nil + } + + return block, fmt.Errorf("block %s is not found in mockData", reference.BlockPath) +} + +func (m *mockBlockClient) PutBlocks(_ context.Context, _ []Block) ([]Block, error) { + panic("implement me") +} + +func (m *mockBlockClient) DeleteBlocks(_ context.Context, _ []BlockRef) error { + panic("implement me") +} + +func Test_blockDownloader_extractBlock(t *testing.T) { + blockFile, bloomFileContent, seriesFileContent := createBlockArchive(t) + + workingDir := t.TempDir() + downloader := &blockDownloader{workingDirectory: workingDir} + ts := time.Now().UTC() + block := Block{ + BlockRef: BlockRef{BlockPath: "first-period-19621/tenantA/metas/ff-fff-1695272400-1695276000-aaa"}, + Data: blockFile, + } + + actualPath, err := downloader.extractBlock(&block, ts) + + require.NoError(t, err) + expectedPath := filepath.Join(workingDir, block.BlockPath, strconv.FormatInt(ts.UnixMilli(), 10)) + require.Equal(t, expectedPath, actualPath, + "expected archive to be extracted to working directory under the same path as blockPath and with timestamp suffix") + require.FileExists(t, filepath.Join(expectedPath, v1.BloomFileName)) + require.FileExists(t, filepath.Join(expectedPath, v1.SeriesFileName)) + + actualBloomFileContent, err := os.ReadFile(filepath.Join(expectedPath, v1.BloomFileName)) + require.NoError(t, err) + require.Equal(t, bloomFileContent, string(actualBloomFileContent)) + + actualSeriesFileContent, err := os.ReadFile(filepath.Join(expectedPath, v1.SeriesFileName)) + require.NoError(t, err) + require.Equal(t, seriesFileContent, string(actualSeriesFileContent)) +} + +func createBlockArchive(t *testing.T) (*os.File, string, string) { + dir := t.TempDir() + mockBlockDir := filepath.Join(dir, "mock-block-dir") + err := os.MkdirAll(mockBlockDir, 0777) + require.NoError(t, err) + bloomFile, err := os.Create(filepath.Join(mockBlockDir, v1.BloomFileName)) + require.NoError(t, err) + bloomFileContent := uuid.NewString() + _, err = io.Copy(bloomFile, bytes.NewReader([]byte(bloomFileContent))) + require.NoError(t, err) + + seriesFile, err := os.Create(filepath.Join(mockBlockDir, v1.SeriesFileName)) + require.NoError(t, err) + seriesFileContent := uuid.NewString() + _, err = io.Copy(seriesFile, bytes.NewReader([]byte(seriesFileContent))) + require.NoError(t, err) + + blockFilePath := filepath.Join(dir, "test-block-archive") + file, err := os.OpenFile(blockFilePath, os.O_CREATE|os.O_RDWR, 0700) + require.NoError(t, err) + err = v1.TarGz(file, v1.NewDirectoryBlockReader(mockBlockDir)) + require.NoError(t, err) + + blockFile, err := os.OpenFile(blockFilePath, os.O_RDONLY, 0700) + require.NoError(t, err) + return blockFile, bloomFileContent, seriesFileContent +} diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index a68959e1d908e..5709bf8866f21 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -79,7 +79,7 @@ type Block struct { } type BlockClient interface { - GetBlocks(ctx context.Context, references []BlockRef) (chan Block, chan error) + GetBlock(ctx context.Context, reference BlockRef) (Block, error) PutBlocks(ctx context.Context, blocks []Block) ([]Block, error) DeleteBlocks(ctx context.Context, blocks []BlockRef) error } @@ -190,42 +190,23 @@ func (b *BloomClient) DeleteMeta(ctx context.Context, meta Meta) error { return b.periodicObjectClients[periodFrom].DeleteObject(ctx, key) } -// GetBlocks downloads all the blocks from objectStorage in parallel and sends the downloaded blocks -// via the channel Block that is closed only if all the blocks are downloaded without errors. -// If an error happens, the error will be sent via error channel. -func (b *BloomClient) GetBlocks(ctx context.Context, references []BlockRef) (chan Block, chan error) { - blocksChannel := make(chan Block, len(references)) - errChannel := make(chan error) - go func() { - //todo move concurrency to the config - err := concurrency.ForEachJob(ctx, len(references), 100, func(ctx context.Context, idx int) error { - reference := references[idx] - period, err := findPeriod(b.periodicConfigs, reference.StartTimestamp) - if err != nil { - return fmt.Errorf("error while period lookup: %w", err) - } - objectClient := b.periodicObjectClients[period] - readCloser, _, err := objectClient.GetObject(ctx, createBlockObjectKey(reference.Ref)) - if err != nil { - return fmt.Errorf("error while fetching object from storage: %w", err) - } - blocksChannel <- Block{ - BlockRef: reference, - Data: readCloser, - } - return nil - }) - if err != nil { - errChannel <- fmt.Errorf("error downloading block file: %w", err) - return - } - //close blocks channel only if there is no error - close(blocksChannel) - }() - return blocksChannel, errChannel +// GetBlock downloads the blocks from objectStorage and returns the downloaded block +func (b *BloomClient) GetBlock(ctx context.Context, reference BlockRef) (Block, error) { + period, err := findPeriod(b.periodicConfigs, reference.StartTimestamp) + if err != nil { + return Block{}, fmt.Errorf("error while period lookup: %w", err) + } + objectClient := b.periodicObjectClients[period] + readCloser, _, err := objectClient.GetObject(ctx, createBlockObjectKey(reference.Ref)) + if err != nil { + return Block{}, fmt.Errorf("error while fetching object from storage: %w", err) + } + return Block{ + BlockRef: reference, + Data: readCloser, + }, nil } -// TODO zip (archive) blocks before uploading to storage func (b *BloomClient) PutBlocks(ctx context.Context, blocks []Block) ([]Block, error) { results := make([]Block, len(blocks)) //todo move concurrency to the config diff --git a/pkg/storage/stores/shipper/bloomshipper/client_test.go b/pkg/storage/stores/shipper/bloomshipper/client_test.go index 4c4b6f855a8ec..7267856a43155 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/client_test.go @@ -32,7 +32,7 @@ var ( ) func Test_BloomClient_GetMetas(t *testing.T) { - shipper := createShipper(t) + shipper := createClient(t) var expected []Meta folder1 := shipper.storageConfig.NamedStores.Filesystem["folder-1"].Directory @@ -99,12 +99,12 @@ func Test_BloomClient_PutMeta(t *testing.T) { } for name, data := range tests { t.Run(name, func(t *testing.T) { - shipper := createShipper(t) + bloomClient := createClient(t) - err := shipper.PutMeta(context.Background(), data.source) + err := bloomClient.PutMeta(context.Background(), data.source) require.NoError(t, err) - directory := shipper.storageConfig.NamedStores.Filesystem[data.expectedStorage].Directory + directory := bloomClient.storageConfig.NamedStores.Filesystem[data.expectedStorage].Directory filePath := filepath.Join(directory, data.expectedFilePath) require.FileExists(t, filePath) content, err := os.ReadFile(filePath) @@ -155,15 +155,15 @@ func Test_BloomClient_DeleteMeta(t *testing.T) { } for name, data := range tests { t.Run(name, func(t *testing.T) { - shipper := createShipper(t) - directory := shipper.storageConfig.NamedStores.Filesystem[data.expectedStorage].Directory + bloomClient := createClient(t) + directory := bloomClient.storageConfig.NamedStores.Filesystem[data.expectedStorage].Directory file := filepath.Join(directory, data.expectedFilePath) err := os.MkdirAll(file[:strings.LastIndex(file, delimiter)], 0755) require.NoError(t, err) err = os.WriteFile(file, []byte("dummy content"), 0700) require.NoError(t, err) - err = shipper.DeleteMeta(context.Background(), data.source) + err = bloomClient.DeleteMeta(context.Background(), data.source) require.NoError(t, err) require.NoFileExists(t, file) @@ -173,8 +173,8 @@ func Test_BloomClient_DeleteMeta(t *testing.T) { } func Test_BloomClient_GetBlocks(t *testing.T) { - shipper := createShipper(t) - fsNamedStores := shipper.storageConfig.NamedStores.Filesystem + bloomClient := createClient(t) + fsNamedStores := bloomClient.storageConfig.NamedStores.Filesystem firstBlockPath := "bloom/first-period-19621/tenantA/blooms/eeee-ffff/1695272400-1695276000-1" firstBlockFullPath := filepath.Join(fsNamedStores["folder-1"].Directory, firstBlockPath) firstBlockData := createBlockFile(t, firstBlockFullPath) @@ -209,44 +209,21 @@ func Test_BloomClient_GetBlocks(t *testing.T) { BlockPath: secondBlockPath, } - blocksToDownload := []BlockRef{firstBlockRef, secondBlockRef} - - blocksCh, errorsCh := shipper.GetBlocks(context.Background(), blocksToDownload) - blocks := make(map[string]string) - func() { - timout := time.After(5 * time.Second) - for { - select { - case <-timout: - t.Fatalf("the test had to be completed before the timeout") - return - case err := <-errorsCh: - require.NoError(t, err) - case block, ok := <-blocksCh: - if !ok { - return - } - blockData, err := io.ReadAll(block.Data) - require.NoError(t, err) - blocks[block.BlockRef.BlockPath] = string(blockData) - - } - } - }() - - firstBlockActualData, exists := blocks[firstBlockRef.BlockPath] - require.Truef(t, exists, "data for the first block must be present in the results: %+v", blocks) - require.Equal(t, firstBlockData, firstBlockActualData) - - secondBlockActualData, exists := blocks[secondBlockRef.BlockPath] - require.True(t, exists, "data for the second block must be present in the results: %+v", blocks) - require.Equal(t, secondBlockData, secondBlockActualData) + downloadedFirstBlock, err := bloomClient.GetBlock(context.Background(), firstBlockRef) + require.NoError(t, err) + firstBlockActualData, err := io.ReadAll(downloadedFirstBlock.Data) + require.NoError(t, err) + require.Equal(t, firstBlockData, string(firstBlockActualData)) - require.Len(t, blocks, 2) + downloadedSecondBlock, err := bloomClient.GetBlock(context.Background(), secondBlockRef) + require.NoError(t, err) + secondBlockActualData, err := io.ReadAll(downloadedSecondBlock.Data) + require.NoError(t, err) + require.Equal(t, secondBlockData, string(secondBlockActualData)) } func Test_BloomClient_PutBlocks(t *testing.T) { - shipper := createShipper(t) + bloomClient := createClient(t) blockForFirstFolderData := "data1" blockForFirstFolder := Block{ BlockRef: BlockRef{ @@ -281,7 +258,7 @@ func Test_BloomClient_PutBlocks(t *testing.T) { Data: aws_io.ReadSeekNopCloser{ReadSeeker: bytes.NewReader([]byte(blockForSecondFolderData))}, } - results, err := shipper.PutBlocks(context.Background(), []Block{blockForFirstFolder, blockForSecondFolder}) + results, err := bloomClient.PutBlocks(context.Background(), []Block{blockForFirstFolder, blockForSecondFolder}) require.NoError(t, err) require.Len(t, results, 2) firstResultBlock := results[0] @@ -295,7 +272,7 @@ func Test_BloomClient_PutBlocks(t *testing.T) { require.Equal(t, blockForFirstFolder.EndTimestamp, firstResultBlock.EndTimestamp) require.Equal(t, blockForFirstFolder.Checksum, firstResultBlock.Checksum) require.Equal(t, blockForFirstFolder.IndexPath, firstResultBlock.IndexPath) - folder1 := shipper.storageConfig.NamedStores.Filesystem["folder-1"].Directory + folder1 := bloomClient.storageConfig.NamedStores.Filesystem["folder-1"].Directory savedFilePath := filepath.Join(folder1, path) require.FileExists(t, savedFilePath) savedData, err := os.ReadFile(savedFilePath) @@ -313,7 +290,7 @@ func Test_BloomClient_PutBlocks(t *testing.T) { require.Equal(t, blockForSecondFolder.EndTimestamp, secondResultBlock.EndTimestamp) require.Equal(t, blockForSecondFolder.Checksum, secondResultBlock.Checksum) require.Equal(t, blockForSecondFolder.IndexPath, secondResultBlock.IndexPath) - folder2 := shipper.storageConfig.NamedStores.Filesystem["folder-2"].Directory + folder2 := bloomClient.storageConfig.NamedStores.Filesystem["folder-2"].Directory savedFilePath = filepath.Join(folder2, path) require.FileExists(t, savedFilePath) @@ -323,8 +300,8 @@ func Test_BloomClient_PutBlocks(t *testing.T) { } func Test_BloomClient_DeleteBlocks(t *testing.T) { - shipper := createShipper(t) - fsNamedStores := shipper.storageConfig.NamedStores.Filesystem + bloomClient := createClient(t) + fsNamedStores := bloomClient.storageConfig.NamedStores.Filesystem block1Path := filepath.Join(fsNamedStores["folder-1"].Directory, "bloom/first-period-19621/tenantA/blooms/eeee-ffff/1695272400-1695276000-1") createBlockFile(t, block1Path) block2Path := filepath.Join(fsNamedStores["folder-2"].Directory, "bloom/second-period-19624/tenantA/blooms/aaaa-bbbb/1695531600-1695535200-2") @@ -358,7 +335,7 @@ func Test_BloomClient_DeleteBlocks(t *testing.T) { IndexPath: uuid.New().String(), }, } - err := shipper.DeleteBlocks(context.Background(), blocksToDelete) + err := bloomClient.DeleteBlocks(context.Background(), blocksToDelete) require.NoError(t, err) require.NoFileExists(t, block1Path) require.NoFileExists(t, block2Path) @@ -500,7 +477,7 @@ func Test_createMetaRef(t *testing.T) { } } -func createShipper(t *testing.T) *BloomClient { +func createClient(t *testing.T) *BloomClient { periodicConfigs := createPeriodConfigs() namedStores := storage.NamedStores{ Filesystem: map[string]storage.NamedFSConfig{ @@ -513,9 +490,9 @@ func createShipper(t *testing.T) *BloomClient { metrics := storage.NewClientMetrics() t.Cleanup(metrics.Unregister) - bshipper, err := NewBloomClient(periodicConfigs, storageConfig, metrics) + bloomClient, err := NewBloomClient(periodicConfigs, storageConfig, metrics) require.NoError(t, err) - return bshipper + return bloomClient } func createPeriodConfigs() []config.PeriodConfig { diff --git a/pkg/storage/stores/shipper/bloomshipper/config/config.go b/pkg/storage/stores/shipper/bloomshipper/config/config.go index 7e9ab787ff3ab..748e037ca57c1 100644 --- a/pkg/storage/stores/shipper/bloomshipper/config/config.go +++ b/pkg/storage/stores/shipper/bloomshipper/config/config.go @@ -8,11 +8,23 @@ import ( ) type Config struct { - WorkingDirectory string `yaml:"working_directory"` + WorkingDirectory string `yaml:"working_directory"` + BlocksDownloadingQueue DownloadingQueueConfig `yaml:"blocks_downloading_queue"` +} + +type DownloadingQueueConfig struct { + WorkersCount int `yaml:"workers_count"` + MaxTasksEnqueuedPerTenant int `yaml:"max_tasks_enqueued_per_tenant"` +} + +func (cfg *DownloadingQueueConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.IntVar(&cfg.WorkersCount, prefix+"workers-count", 100, "The count of parallel workers that download Bloom Blocks.") + f.IntVar(&cfg.MaxTasksEnqueuedPerTenant, prefix+"max_tasks_enqueued_per_tenant", 10_000, "Maximum number of task in queue per tenant per bloom-gateway. Enqueuing the tasks above this limit will fail an error.") } func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.StringVar(&c.WorkingDirectory, prefix+"shipper.working-directory", "bloom-shipper", "Working directory to store downloaded Bloom Blocks.") + c.BlocksDownloadingQueue.RegisterFlagsWithPrefix(prefix+"shipper.blocks-downloading-queue.", f) } func (c *Config) Validate() error { diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper.go b/pkg/storage/stores/shipper/bloomshipper/shipper.go index 2df1f41cd4a25..98dbbb20a476a 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper.go @@ -4,32 +4,38 @@ import ( "cmp" "context" "fmt" - "io" - "os" - "path/filepath" - "strconv" - "strings" "time" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" "golang.org/x/exp/slices" - v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" ) type Shipper struct { - client Client - config config.Config - logger log.Logger + client Client + config config.Config + logger log.Logger + blockDownloader *blockDownloader } -func NewShipper(client Client, config config.Config, logger log.Logger) (*Shipper, error) { +type Limits interface { + BloomGatewayBlocksDownloadingParallelism(tenantID string) int +} + +func NewShipper(client Client, config config.Config, limits Limits, logger log.Logger, reg prometheus.Registerer) (*Shipper, error) { + logger = log.With(logger, "component", "bloom-shipper") + downloader, err := newBlockDownloader(config, client, limits, logger, reg) + if err != nil { + return nil, fmt.Errorf("error creating block downloader: %w", err) + } return &Shipper{ - client: client, - config: config, - logger: log.With(logger, "component", "bloom-shipper"), + client: client, + config: config, + logger: logger, + blockDownloader: downloader, }, nil } @@ -47,21 +53,18 @@ func (s *Shipper) ForEachBlock( return fmt.Errorf("error fetching active block references : %w", err) } - blocksChannel, errorsChannel := s.client.GetBlocks(ctx, blockRefs) + cancelContext, cancelFunc := context.WithCancel(ctx) + defer cancelFunc() + blocksChannel, errorsChannel := s.blockDownloader.downloadBlocks(cancelContext, tenantID, blockRefs) for { select { - case block, ok := <-blocksChannel: + case result, ok := <-blocksChannel: if !ok { return nil } - directory, err := s.extractBlock(&block, time.Now().UTC()) + err = callback(result.BlockQuerier) if err != nil { - return fmt.Errorf("error unarchiving block %s err: %w", block.BlockPath, err) - } - blockQuerier := s.createBlockQuerier(directory) - err = callback(blockQuerier) - if err != nil { - return fmt.Errorf("error running callback function for block %s err: %w", block.BlockPath, err) + return fmt.Errorf("error running callback function for block %s err: %w", result.BlockPath, err) } case err := <-errorsChannel: if err != nil { @@ -73,6 +76,7 @@ func (s *Shipper) ForEachBlock( func (s *Shipper) Stop() { s.client.Stop() + s.blockDownloader.stop() } // getFromThrough returns the first and list item of a fingerprint slice @@ -177,55 +181,3 @@ func isOutsideRange(b *BlockRef, startTimestamp, endTimestamp int64, fingerprint } return b.MaxFingerprint < fingerprints[idx] } - -// extract the files into directory and returns absolute path to this directory. -func (s *Shipper) extractBlock(block *Block, ts time.Time) (string, error) { - workingDirectoryPath := filepath.Join(s.config.WorkingDirectory, block.BlockPath, strconv.FormatInt(ts.UnixMilli(), 10)) - err := os.MkdirAll(workingDirectoryPath, os.ModePerm) - if err != nil { - return "", fmt.Errorf("can not create directory to extract the block: %w", err) - } - archivePath, err := writeDataToTempFile(workingDirectoryPath, block) - if err != nil { - return "", fmt.Errorf("error writing data to temp file: %w", err) - } - defer func() { - os.Remove(archivePath) - // todo log err - }() - err = extractArchive(archivePath, workingDirectoryPath) - if err != nil { - return "", fmt.Errorf("error extracting archive: %w", err) - } - return workingDirectoryPath, nil -} - -func (s *Shipper) createBlockQuerier(directory string) *v1.BlockQuerier { - reader := v1.NewDirectoryBlockReader(directory) - block := v1.NewBlock(reader) - return v1.NewBlockQuerier(block) -} - -func writeDataToTempFile(workingDirectoryPath string, block *Block) (string, error) { - defer block.Data.Close() - archivePath := filepath.Join(workingDirectoryPath, block.BlockPath[strings.LastIndex(block.BlockPath, delimiter)+1:]) - - archiveFile, err := os.Create(archivePath) - if err != nil { - return "", fmt.Errorf("error creating empty file to store the archiver: %w", err) - } - defer archiveFile.Close() - _, err = io.Copy(archiveFile, block.Data) - if err != nil { - return "", fmt.Errorf("error writing data to archive file: %w", err) - } - return archivePath, nil -} - -func extractArchive(archivePath string, workingDirectoryPath string) error { - file, err := os.Open(archivePath) - if err != nil { - return fmt.Errorf("error opening archive file %s: %w", file.Name(), err) - } - return v1.UnTarGz(workingDirectoryPath, file) -} diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go index 45450c0e3838b..17f21793680ca 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go @@ -1,21 +1,11 @@ package bloomshipper import ( - "bytes" "fmt" - "io" "math" - "os" - "path/filepath" - "strconv" "testing" - "time" - "github.com/google/uuid" "github.com/stretchr/testify/require" - - v1 "github.com/grafana/loki/pkg/storage/bloom/v1" - "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" ) func Test_Shipper_findBlocks(t *testing.T) { @@ -208,61 +198,3 @@ func createBlockRef( BlockPath: blockPath, } } - -const ( - bloomFileName = "bloom" - seriesFileName = "series" -) - -func Test_Shipper_extractBlock(t *testing.T) { - dir := t.TempDir() - - mockBlockDir := filepath.Join(dir, "mock-block-dir") - err := os.MkdirAll(mockBlockDir, 0777) - require.NoError(t, err) - bloomFile, err := os.Create(filepath.Join(mockBlockDir, bloomFileName)) - require.NoError(t, err) - bloomFileContent := uuid.NewString() - _, err = io.Copy(bloomFile, bytes.NewReader([]byte(bloomFileContent))) - require.NoError(t, err) - - seriesFile, err := os.Create(filepath.Join(mockBlockDir, seriesFileName)) - require.NoError(t, err) - seriesFileContent := uuid.NewString() - _, err = io.Copy(seriesFile, bytes.NewReader([]byte(seriesFileContent))) - require.NoError(t, err) - - blockFilePath := filepath.Join(dir, "test-block-archive") - file, err := os.OpenFile(blockFilePath, os.O_CREATE|os.O_RDWR, 0700) - require.NoError(t, err) - err = v1.TarGz(file, v1.NewDirectoryBlockReader(mockBlockDir)) - require.NoError(t, err) - - blockFile, err := os.OpenFile(blockFilePath, os.O_RDONLY, 0700) - require.NoError(t, err) - - workingDir := t.TempDir() - shipper := Shipper{config: config.Config{WorkingDirectory: workingDir}} - ts := time.Now().UTC() - block := Block{ - BlockRef: BlockRef{BlockPath: "first-period-19621/tenantA/metas/ff-fff-1695272400-1695276000-aaa"}, - Data: blockFile, - } - - actualPath, err := shipper.extractBlock(&block, ts) - - require.NoError(t, err) - expectedPath := filepath.Join(workingDir, block.BlockPath, strconv.FormatInt(ts.UnixMilli(), 10)) - require.Equal(t, expectedPath, actualPath, - "expected archive to be extracted to working directory under the same path as blockPath and with timestamp suffix") - require.FileExists(t, filepath.Join(expectedPath, bloomFileName)) - require.FileExists(t, filepath.Join(expectedPath, seriesFileName)) - - actualBloomFileContent, err := os.ReadFile(filepath.Join(expectedPath, bloomFileName)) - require.NoError(t, err) - require.Equal(t, bloomFileContent, string(actualBloomFileContent)) - - actualSeriesFileContent, err := os.ReadFile(filepath.Join(expectedPath, seriesFileName)) - require.NoError(t, err) - require.Equal(t, seriesFileContent, string(actualSeriesFileContent)) -} diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 250fa9575c470..c4e38a898d2c6 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -182,13 +182,14 @@ type Limits struct { BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"` BloomGatewayEnabled bool `yaml:"bloom_gateway_enable_filtering" json:"bloom_gateway_enable_filtering"` - BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"` - BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"` - BloomCompactorMinTableAge time.Duration `yaml:"bloom_compactor_min_table_age" json:"bloom_compactor_min_table_age"` - BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"` - BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length"` - BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"` - BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate"` + BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"` + BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"` + BloomCompactorMinTableAge time.Duration `yaml:"bloom_compactor_min_table_age" json:"bloom_compactor_min_table_age"` + BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"` + BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length"` + BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"` + BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate"` + BloomGatewayBlocksDownloadingParallelism int `yaml:"bloom_gateway_blocks_downloading_parallelism" json:"bloom_gateway_blocks_downloading_parallelism"` AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata in push payload."` MaxStructuredMetadataSize flagext.ByteSize `yaml:"max_structured_metadata_size" json:"max_structured_metadata_size" doc:"description=Maximum size accepted for structured metadata per log line."` @@ -309,6 +310,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.BloomNGramLength, "bloom-compactor.ngram-length", 4, "Length of the n-grams created when computing blooms from log lines.") f.IntVar(&l.BloomNGramSkip, "bloom-compactor.ngram-skip", 0, "Skip factor for the n-grams created when computing blooms from log lines.") f.Float64Var(&l.BloomFalsePositiveRate, "bloom-compactor.false-positive-rate", 0.01, "Scalable Bloom Filter desired false-positive rate.") + f.IntVar(&l.BloomGatewayBlocksDownloadingParallelism, "bloom-gateway.blocks-downloading-parallelism", 50, "Maximum number of blocks will be downloaded in parallel by the Bloom Gateway.") l.ShardStreams = &shardstreams.Config{} l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f) @@ -788,6 +790,10 @@ func (o *Overrides) BloomGatewayShardSize(userID string) int { return o.getOverridesForUser(userID).BloomGatewayShardSize } +func (o *Overrides) BloomGatewayBlocksDownloadingParallelism(userID string) int { + return o.getOverridesForUser(userID).BloomGatewayBlocksDownloadingParallelism +} + func (o *Overrides) BloomGatewayEnabled(userID string) bool { return o.getOverridesForUser(userID).BloomGatewayEnabled }