Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bloom blocks downloading queue #11201

Merged
merged 14 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2248,6 +2248,16 @@ bloom_shipper:
# Working directory to store downloaded Bloom Blocks.
# CLI flag: -bloom.shipper.working-directory
[working_directory: <string> | default = "bloom-shipper"]

blocks_downloading_queue:
# The count of parallel workers that download Bloom Blocks.
# CLI flag: -bloom.shipper.blocks-downloading-queue.workers-count
[workers_count: <int> | default = 100]

# Maximum number of task in queue per tenant per bloom-gateway. Enqueuing
# the tasks above this limit will fail an error.
# CLI flag: -bloom.shipper.blocks-downloading-queue.max_tasks_enqueued_per_tenant
[max_tasks_enqueued_per_tenant: <int> | default = 10000]
```

### chunk_store_config
Expand Down Expand Up @@ -2990,6 +3000,10 @@ shard_streams:
# CLI flag: -bloom-compactor.false-positive-rate
[bloom_false_positive_rate: <float> | default = 0.01]

# Maximum number of blocks will be downloaded in parallel by the Bloom Gateway.
# CLI flag: -bloom-gateway.blocks-downloading-parallelism
[bloom_gateway_blocks_downloading_parallelism: <int> | default = 50]

# Allow user to send structured metadata in push payload.
# CLI flag: -validation.allow-structured-metadata
[allow_structured_metadata: <boolean> | default = false]
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ type Gateway struct {
}

// New returns a new instance of the Bloom Gateway.
func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, shardingStrategy ShardingStrategy, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, overrides Limits, shardingStrategy ShardingStrategy, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
g := &Gateway{
cfg: cfg,
logger: logger,
Expand All @@ -205,7 +205,7 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, s
return nil, err
}

bloomShipper, err := bloomshipper.NewShipper(client, storageCfg.BloomShipperConfig, logger)
bloomShipper, err := bloomshipper.NewShipper(client, storageCfg.BloomShipperConfig, overrides, logger, reg)
if err != nil {
return nil, err
}
Expand Down
26 changes: 22 additions & 4 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestBloomGateway_StartStopService(t *testing.T) {
},
}

gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg)
gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {

t.Run("returns unfiltered chunk refs if no filters provided", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg)
gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {

t.Run("returns error if chunk refs do not belong to tenant", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg)
gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg)
require.NoError(t, err)

ts, _ := time.Parse("2006-01-02 15:04", "2023-10-03 10:00")
Expand All @@ -212,7 +212,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {

t.Run("gateway tracks active users", func(t *testing.T) {
reg := prometheus.NewRegistry()
gw, err := New(cfg, schemaCfg, storageCfg, ss, cm, logger, reg)
gw, err := New(cfg, schemaCfg, storageCfg, fakeLimits{}, ss, cm, logger, reg)
require.NoError(t, err)

err = services.StartAndAwaitRunning(context.Background(), gw)
Expand Down Expand Up @@ -248,3 +248,21 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
require.ElementsMatch(t, tenants, gw.activeUsers.ActiveUsers())
})
}

type fakeLimits struct {
}

func (f fakeLimits) BloomGatewayShardSize(_ string) int {
//TODO implement me
panic("implement me")
Comment on lines +256 to +257
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//TODO implement me
panic("implement me")
return 0

}

func (f fakeLimits) BloomGatewayEnabled(_ string) bool {
//TODO implement me
panic("implement me")
Comment on lines +261 to +262
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//TODO implement me
panic("implement me")
return true

}

func (f fakeLimits) BloomGatewayBlocksDownloadingParallelism(_ string) int {
//TODO implement me
panic("implement me")
Comment on lines +266 to +267
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//TODO implement me
panic("implement me")
return 1

}
1 change: 1 addition & 0 deletions pkg/bloomgateway/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
type Limits interface {
BloomGatewayShardSize(tenantID string) int
BloomGatewayEnabled(tenantID string) bool
BloomGatewayBlocksDownloadingParallelism(userID string) int
}

type ShardingStrategy interface {
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,7 +1263,7 @@ func (t *Loki) initBloomGateway() (services.Service, error) {

shuffleSharding := bloomgateway.NewShuffleShardingStrategy(t.bloomGatewayRingManager.Ring, t.bloomGatewayRingManager.RingLifecycler, t.Overrides, logger)

gateway, err := bloomgateway.New(t.Cfg.BloomGateway, t.Cfg.SchemaConfig, t.Cfg.StorageConfig, shuffleSharding, t.clientMetrics, logger, prometheus.DefaultRegisterer)
gateway, err := bloomgateway.New(t.Cfg.BloomGateway, t.Cfg.SchemaConfig, t.Cfg.StorageConfig, t.Overrides, shuffleSharding, t.clientMetrics, logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/bloom/v1/block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
)

const (
bloomFileName = "bloom"
seriesFileName = "series"
BloomFileName = "bloom"
SeriesFileName = "series"
)

type BlockWriter interface {
Expand Down Expand Up @@ -66,12 +66,12 @@ func (b *DirectoryBlockWriter) Init() error {
return errors.Wrap(err, "creating bloom block dir")
}

b.index, err = os.Create(filepath.Join(b.dir, seriesFileName))
b.index, err = os.Create(filepath.Join(b.dir, SeriesFileName))
if err != nil {
return errors.Wrap(err, "creating series file")
}

b.blooms, err = os.Create(filepath.Join(b.dir, bloomFileName))
b.blooms, err = os.Create(filepath.Join(b.dir, BloomFileName))
if err != nil {
return errors.Wrap(err, "creating bloom file")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/bloom/v1/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ func NewDirectoryBlockReader(dir string) *DirectoryBlockReader {
func (r *DirectoryBlockReader) Init() error {
if !r.initialized {
var err error
r.index, err = os.Open(filepath.Join(r.dir, seriesFileName))
r.index, err = os.Open(filepath.Join(r.dir, SeriesFileName))
if err != nil {
return errors.Wrap(err, "opening series file")
}

r.blooms, err = os.Open(filepath.Join(r.dir, bloomFileName))
r.blooms, err = os.Open(filepath.Join(r.dir, BloomFileName))
if err != nil {
return errors.Wrap(err, "opening bloom file")
}
Expand Down
230 changes: 230 additions & 0 deletions pkg/storage/stores/shipper/bloomshipper/block_downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package bloomshipper

import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/pkg/queue"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/constants"
)

type blockDownloader struct {
logger log.Logger

workingDirectory string
queueMetrics *queue.Metrics
queue *queue.RequestQueue
blockClient BlockClient
limits Limits
activeUsersService *util.ActiveUsersCleanupService

ctx context.Context
manager *services.Manager
onWorkerStopCallback func()
}

func newBlockDownloader(config config.Config, blockClient BlockClient, limits Limits, logger log.Logger, reg prometheus.Registerer) (*blockDownloader, error) {
queueMetrics := queue.NewMetrics(reg, constants.Loki, "bloom_blocks_downloader")
//add cleanup service
downloadingQueue := queue.NewRequestQueue(config.BlocksDownloadingQueue.MaxTasksEnqueuedPerTenant, time.Minute, queueMetrics)
activeUsersService := util.NewActiveUsersCleanupWithDefaultValues(queueMetrics.Cleanup)

ctx := context.Background()
manager, err := services.NewManager(downloadingQueue, activeUsersService)
if err != nil {
return nil, fmt.Errorf("error creating service manager: %w", err)
}
err = services.StartManagerAndAwaitHealthy(ctx, manager)
if err != nil {
return nil, fmt.Errorf("error starting service manager: %w", err)
}

b := &blockDownloader{
ctx: ctx,
logger: logger,
workingDirectory: config.WorkingDirectory,
queueMetrics: queueMetrics,
queue: downloadingQueue,
blockClient: blockClient,
activeUsersService: activeUsersService,
limits: limits,
manager: manager,
onWorkerStopCallback: onWorkerStopNoopCallback,
}

for i := 0; i < config.BlocksDownloadingQueue.WorkersCount; i++ {
go b.serveDownloadingTasks(fmt.Sprintf("worker-%d", i))
vlad-diachenko marked this conversation as resolved.
Show resolved Hide resolved
}
return b, nil
}

type BlockDownloadingTask struct {
ctx context.Context
block BlockRef
// ErrCh is a send-only channel to write an error to
ErrCh chan<- error
// ResultsCh is a send-only channel to return the block querier for the downloaded block
ResultsCh chan<- blockWithQuerier
}

func NewBlockDownloadingTask(ctx context.Context, block BlockRef, resCh chan<- blockWithQuerier, errCh chan<- error) *BlockDownloadingTask {
return &BlockDownloadingTask{
ctx: ctx,
block: block,
ErrCh: errCh,
ResultsCh: resCh,
}
}

// noop implementation
var onWorkerStopNoopCallback = func() {}

func (d *blockDownloader) serveDownloadingTasks(workerID string) {
logger := log.With(d.logger, "worker", workerID)
level.Debug(logger).Log("msg", "starting worker")

d.queue.RegisterConsumerConnection(workerID)
defer d.queue.UnregisterConsumerConnection(workerID)
//this callback is used only in the tests to assert that worker is stopped
defer d.onWorkerStopCallback()

idx := queue.StartIndexWithLocalQueue

for {
vlad-diachenko marked this conversation as resolved.
Show resolved Hide resolved
item, newIdx, err := d.queue.Dequeue(d.ctx, idx, workerID)
if err != nil {
if !errors.Is(err, queue.ErrStopped) && !errors.Is(err, context.Canceled) {
level.Error(logger).Log("msg", "failed to dequeue task", "err", err)
continue
}
level.Info(logger).Log("msg", "stopping worker")
return
}
task, ok := item.(*BlockDownloadingTask)
if !ok {
level.Error(logger).Log("msg", "failed to cast to BlockDownloadingTask", "item", fmt.Sprintf("%+v", item), "type", fmt.Sprintf("%T", item))
continue
}

idx = newIdx
blockPath := task.block.BlockPath
//todo add cache before downloading
level.Debug(logger).Log("msg", "start downloading the block", "block", blockPath)
block, err := d.blockClient.GetBlock(task.ctx, task.block)
if err != nil {
level.Error(logger).Log("msg", "error downloading the block", "block", blockPath, "err", err)
task.ErrCh <- fmt.Errorf("error downloading the block %s : %w", blockPath, err)
continue
}
directory, err := d.extractBlock(&block, time.Now())
if err != nil {
level.Error(logger).Log("msg", "error extracting the block", "block", blockPath, "err", err)
task.ErrCh <- fmt.Errorf("error extracting the block %s : %w", blockPath, err)
continue
}
level.Debug(d.logger).Log("msg", "block has been downloaded and extracted", "block", task.block.BlockPath, "directory", directory)
blockQuerier := d.createBlockQuerier(directory)
task.ResultsCh <- blockWithQuerier{
BlockRef: task.block,
BlockQuerier: blockQuerier,
}
}
}

func (d *blockDownloader) downloadBlocks(ctx context.Context, tenantID string, references []BlockRef) (chan blockWithQuerier, chan error) {
d.activeUsersService.UpdateUserTimestamp(tenantID, time.Now())
// we need to have errCh with size that can keep max count of errors to prevent the case when
// the queue worker reported the error to this channel before the current goroutine
// and this goroutine will go to the deadlock because it won't be able to report an error
// because nothing reads this channel at this moment.
errCh := make(chan error, len(references))
Comment on lines +151 to +155
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand that correctly. On line 168 you return after sending the first and only error to the errCh. Could you just make buffered channel of size 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this channel is also passed to the queue and queue workers will report errors to this channel also...

blocksCh := make(chan blockWithQuerier, len(references))

downloadingParallelism := d.limits.BloomGatewayBlocksDownloadingParallelism(tenantID)
for _, reference := range references {
task := NewBlockDownloadingTask(ctx, reference, blocksCh, errCh)
level.Debug(d.logger).Log("msg", "enqueuing task to download block", "block", reference.BlockPath)
err := d.queue.Enqueue(tenantID, nil, task, downloadingParallelism, nil)
if err != nil {
errCh <- fmt.Errorf("error enquing downloading task for block %s : %w", reference.BlockPath, err)
return blocksCh, errCh
}
}
return blocksCh, errCh
}

type blockWithQuerier struct {
BlockRef
*v1.BlockQuerier
}

// extract the files into directory and returns absolute path to this directory.
func (d *blockDownloader) extractBlock(block *Block, ts time.Time) (string, error) {
workingDirectoryPath := filepath.Join(d.workingDirectory, block.BlockPath, strconv.FormatInt(ts.UnixMilli(), 10))
err := os.MkdirAll(workingDirectoryPath, os.ModePerm)
if err != nil {
return "", fmt.Errorf("can not create directory to extract the block: %w", err)
}
archivePath, err := writeDataToTempFile(workingDirectoryPath, block)
if err != nil {
return "", fmt.Errorf("error writing data to temp file: %w", err)
}
defer func() {
os.Remove(archivePath)
// todo log err
}()
err = extractArchive(archivePath, workingDirectoryPath)
if err != nil {
return "", fmt.Errorf("error extracting archive: %w", err)
}
return workingDirectoryPath, nil
}

func (d *blockDownloader) createBlockQuerier(directory string) *v1.BlockQuerier {
reader := v1.NewDirectoryBlockReader(directory)
block := v1.NewBlock(reader)
return v1.NewBlockQuerier(block)
}

func (d *blockDownloader) stop() {
_ = services.StopManagerAndAwaitStopped(d.ctx, d.manager)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should log the error?

}

func writeDataToTempFile(workingDirectoryPath string, block *Block) (string, error) {
vlad-diachenko marked this conversation as resolved.
Show resolved Hide resolved
defer block.Data.Close()
archivePath := filepath.Join(workingDirectoryPath, block.BlockPath[strings.LastIndex(block.BlockPath, delimiter)+1:])

archiveFile, err := os.Create(archivePath)
if err != nil {
return "", fmt.Errorf("error creating empty file to store the archiver: %w", err)
}
defer archiveFile.Close()
_, err = io.Copy(archiveFile, block.Data)
if err != nil {
return "", fmt.Errorf("error writing data to archive file: %w", err)
}
return archivePath, nil
}

func extractArchive(archivePath string, workingDirectoryPath string) error {
file, err := os.Open(archivePath)
if err != nil {
return fmt.Errorf("error opening archive file %s: %w", file.Name(), err)
}
return v1.UnTarGz(workingDirectoryPath, file)
}
Loading
Loading