Skip to content

Commit

Permalink
Limit bloom block size (#11878)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
This PR limits the size of the blocks created by the compactor.

The block builder keeps adding series' blooms to a block until the limit
is exceeded, meaning that blocks may grow beyond the configured maximum.
This is needed so we avoid having tiny blocks which had space for small
blooms but later a bigger blooms didn't fit.

Blocks are built lazily: the generator returns an iterator that builds
one block at a time.

**Special notes for your reviewer**:
The maximum size is currently set to 50 MBs. We will make this
configurable on a followup PR.

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [x] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a)

---------

Co-authored-by: Christian Haudum <[email protected]>
  • Loading branch information
salvacorts and chaudum authored Feb 12, 2024
1 parent 57619b7 commit 681bb57
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 94 deletions.
20 changes: 18 additions & 2 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/multierror"
"github.com/pkg/errors"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
Expand Down Expand Up @@ -134,10 +135,20 @@ func (s *SimpleBloomController) buildBlocks(
level.Error(logger).Log("msg", "failed to get series and blocks", "err", err)
return errors.Wrap(err, "failed to get series and blocks")
}
// Close all remaining blocks on exit
closePreExistingBlocks := func() {
var closeErrors multierror.MultiError
for _, block := range preExistingBlocks {
closeErrors.Add(block.Close())
}
if err := closeErrors.Err(); err != nil {
level.Error(s.logger).Log("msg", "failed to close blocks", "err", err)
}
}

gen := NewSimpleBloomGenerator(
tenant,
v1.DefaultBlockOptions,
v1.DefaultBlockOptions, // TODO(salvacorts) make block options configurable
seriesItr,
s.chunkLoader,
preExistingBlocks,
Expand All @@ -150,13 +161,14 @@ func (s *SimpleBloomController) buildBlocks(
if err != nil {
// TODO(owen-d): metrics
level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
closePreExistingBlocks()
return errors.Wrap(err, "failed to generate bloom")
}

client, err := s.bloomStore.Client(table.ModelTime())

if err != nil {
level.Error(logger).Log("msg", "failed to get client", "err", err)
closePreExistingBlocks()
return errors.Wrap(err, "failed to get client")
}
for newBlocks.Next() {
Expand All @@ -168,16 +180,20 @@ func (s *SimpleBloomController) buildBlocks(
bloomshipper.BlockFrom(tenant, table.String(), blk),
); err != nil {
level.Error(logger).Log("msg", "failed to write block", "err", err)
closePreExistingBlocks()
return errors.Wrap(err, "failed to write block")
}
}

if err := newBlocks.Err(); err != nil {
// TODO(owen-d): metrics
level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
closePreExistingBlocks()
return errors.Wrap(err, "failed to generate bloom")
}

// Close pre-existing blocks
closePreExistingBlocks()
}
}

Expand Down
113 changes: 79 additions & 34 deletions pkg/bloomcompactor/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"

"github.com/grafana/dskit/multierror"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
logql_log "github.com/grafana/loki/pkg/logql/log"
Expand Down Expand Up @@ -110,67 +108,114 @@ func (s *SimpleBloomGenerator) populator(ctx context.Context) func(series *v1.Se
}

func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []v1.BlockMetadata, results v1.Iterator[*v1.Block], err error) {

var closeErrors multierror.MultiError
blocksMatchingSchema := make([]v1.PeekingIterator[*v1.SeriesWithBloom], 0, len(s.blocks))
toClose := make([]*bloomshipper.CloseableBlockQuerier, 0, len(s.blocks))
// Close all remaining blocks on exit
defer func() {
for _, block := range toClose {
closeErrors.Add(block.Close())
}
if err := closeErrors.Err(); err != nil {
level.Error(s.logger).Log("msg", "failed to close blocks", "err", err)
}
}()

blocksMatchingSchema := make([]*bloomshipper.CloseableBlockQuerier, 0, len(s.blocks))
for _, block := range s.blocks {
logger := log.With(s.logger, "block", block.BlockRef)
md, err := block.Metadata()
schema := md.Options.Schema
if err != nil {
level.Warn(logger).Log("msg", "failed to get schema for block", "err", err)
skippedBlocks = append(skippedBlocks, md)

// Close unneeded block
closeErrors.Add(block.Close())
continue
}

if !s.opts.Schema.Compatible(schema) {
level.Warn(logger).Log("msg", "block schema incompatible with options", "generator_schema", fmt.Sprintf("%+v", s.opts.Schema), "block_schema", fmt.Sprintf("%+v", schema))
skippedBlocks = append(skippedBlocks, md)

// Close unneeded block
closeErrors.Add(block.Close())
continue
}

level.Debug(logger).Log("msg", "adding compatible block to bloom generation inputs")
itr := v1.NewPeekingIter[*v1.SeriesWithBloom](block)
blocksMatchingSchema = append(blocksMatchingSchema, itr)
// append needed block to close list (when finished)
toClose = append(toClose, block)
blocksMatchingSchema = append(blocksMatchingSchema, block)
}

level.Debug(s.logger).Log("msg", "generating bloom filters for blocks", "num_blocks", len(blocksMatchingSchema), "skipped_blocks", len(skippedBlocks), "schema", fmt.Sprintf("%+v", s.opts.Schema))

// TODO(owen-d): implement bounded block sizes
mergeBuilder := v1.NewMergeBuilder(blocksMatchingSchema, s.store, s.populator(ctx))
writer, reader := s.readWriterFn()
series := v1.NewPeekingIter(s.store)
blockIter := NewLazyBlockBuilderIterator(ctx, s.opts, s.populator(ctx), s.readWriterFn, series, blocksMatchingSchema)
return skippedBlocks, blockIter, nil
}

blockBuilder, err := v1.NewBlockBuilder(v1.NewBlockOptionsFromSchema(s.opts.Schema), writer)
if err != nil {
return skippedBlocks, nil, errors.Wrap(err, "failed to create bloom block builder")
// LazyBlockBuilderIterator is a lazy iterator over blocks that builds
// each block by adding series to them until they are full.
type LazyBlockBuilderIterator struct {
ctx context.Context
opts v1.BlockOptions
populate func(*v1.Series, *v1.Bloom) error
readWriterFn func() (v1.BlockWriter, v1.BlockReader)
series v1.PeekingIterator[*v1.Series]
blocks []*bloomshipper.CloseableBlockQuerier

blocksAsPeekingIter []v1.PeekingIterator[*v1.SeriesWithBloom]
curr *v1.Block
err error
}

func NewLazyBlockBuilderIterator(
ctx context.Context,
opts v1.BlockOptions,
populate func(*v1.Series, *v1.Bloom) error,
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
series v1.PeekingIterator[*v1.Series],
blocks []*bloomshipper.CloseableBlockQuerier,
) *LazyBlockBuilderIterator {
it := &LazyBlockBuilderIterator{
ctx: ctx,
opts: opts,
populate: populate,
readWriterFn: readWriterFn,
series: series,
blocks: blocks,

blocksAsPeekingIter: make([]v1.PeekingIterator[*v1.SeriesWithBloom], len(blocks)),
}

return it
}

func (b *LazyBlockBuilderIterator) Next() bool {
// No more series to process
if _, hasNext := b.series.Peek(); !hasNext {
return false
}

// reset all the blocks to the start
for i, block := range b.blocks {
if err := block.Reset(); err != nil {
b.err = errors.Wrapf(err, "failed to reset block iterator %d", i)
return false
}
b.blocksAsPeekingIter[i] = v1.NewPeekingIter[*v1.SeriesWithBloom](block)
}

if err := b.ctx.Err(); err != nil {
b.err = errors.Wrap(err, "context canceled")
return false
}

mergeBuilder := v1.NewMergeBuilder(b.blocksAsPeekingIter, b.series, b.populate)
writer, reader := b.readWriterFn()
blockBuilder, err := v1.NewBlockBuilder(b.opts, writer)
if err != nil {
b.err = errors.Wrap(err, "failed to create bloom block builder")
return false
}
_, err = mergeBuilder.Build(blockBuilder)
if err != nil {
return skippedBlocks, nil, errors.Wrap(err, "failed to build bloom block")
b.err = errors.Wrap(err, "failed to build bloom block")
return false
}

return skippedBlocks, v1.NewSliceIter[*v1.Block]([]*v1.Block{v1.NewBlock(reader)}), nil
b.curr = v1.NewBlock(reader)
return true
}

func (b *LazyBlockBuilderIterator) At() *v1.Block {
return b.curr
}

func (b *LazyBlockBuilderIterator) Err() error {
return b.err
}

// IndexLoader loads an index. This helps us do things like
Expand Down
62 changes: 38 additions & 24 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func blocksFromSchemaWithRange(t *testing.T, n int, options v1.BlockOptions, fro
numKeysPerSeries := 10000
data, _ = v1.MkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, fromFP, throughFp, 0, 10000)

seriesPerBlock := 100 / n
seriesPerBlock := numSeries / n

for i := 0; i < n; i++ {
// references for linking in memory reader+writer
Expand Down Expand Up @@ -88,24 +88,35 @@ func dummyBloomGen(opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks [
}

func TestSimpleBloomGenerator(t *testing.T) {
const maxBlockSize = 100 << 20 // 100MB
for _, tc := range []struct {
desc string
fromSchema, toSchema v1.BlockOptions
sourceBlocks, numSkipped int
desc string
fromSchema, toSchema v1.BlockOptions
sourceBlocks, numSkipped, outputBlocks int
}{
{
desc: "SkipsIncompatibleSchemas",
fromSchema: v1.NewBlockOptions(3, 0),
toSchema: v1.NewBlockOptions(4, 0),
fromSchema: v1.NewBlockOptions(3, 0, maxBlockSize),
toSchema: v1.NewBlockOptions(4, 0, maxBlockSize),
sourceBlocks: 2,
numSkipped: 2,
outputBlocks: 1,
},
{
desc: "CombinesBlocks",
fromSchema: v1.NewBlockOptions(4, 0),
toSchema: v1.NewBlockOptions(4, 0),
fromSchema: v1.NewBlockOptions(4, 0, maxBlockSize),
toSchema: v1.NewBlockOptions(4, 0, maxBlockSize),
sourceBlocks: 2,
numSkipped: 0,
outputBlocks: 1,
},
{
desc: "MaxBlockSize",
fromSchema: v1.NewBlockOptions(4, 0, maxBlockSize),
toSchema: v1.NewBlockOptions(4, 0, 1<<10), // 1KB
sourceBlocks: 2,
numSkipped: 0,
outputBlocks: 3,
},
} {
t.Run(tc.desc, func(t *testing.T) {
Expand All @@ -122,22 +133,25 @@ func TestSimpleBloomGenerator(t *testing.T) {
require.Nil(t, err)
require.Equal(t, tc.numSkipped, len(skipped))

require.True(t, results.Next())
block := results.At()
require.False(t, results.Next())

refs := v1.PointerSlice[v1.SeriesWithBloom](data)

v1.EqualIterators[*v1.SeriesWithBloom](
t,
func(a, b *v1.SeriesWithBloom) {
// TODO(owen-d): better equality check
// once chunk fetching is implemented
require.Equal(t, a.Series, b.Series)
},
v1.NewSliceIter[*v1.SeriesWithBloom](refs),
block.Querier(),
)
var outputBlocks []*v1.Block
for results.Next() {
outputBlocks = append(outputBlocks, results.At())
}
require.Equal(t, tc.outputBlocks, len(outputBlocks))

// Check all the input series are present in the output blocks.
expectedRefs := v1.PointerSlice(data)
outputRefs := make([]*v1.SeriesWithBloom, 0, len(data))
for _, block := range outputBlocks {
bq := block.Querier()
for bq.Next() {
outputRefs = append(outputRefs, bq.At())
}
}
require.Equal(t, len(expectedRefs), len(outputRefs))
for i := range expectedRefs {
require.Equal(t, expectedRefs[i].Series, outputRefs[i].Series)
}
})
}
}
4 changes: 4 additions & 0 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ func (bq *BlockQuerier) Schema() (Schema, error) {
return bq.block.Schema()
}

func (bq *BlockQuerier) Reset() error {
return bq.series.Seek(0)
}

func (bq *BlockQuerier) Seek(fp model.Fingerprint) error {
return bq.series.Seek(fp)
}
Expand Down
Loading

0 comments on commit 681bb57

Please sign in to comment.