diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index f6487d5f61d11..88dc6128fb2eb 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -184,7 +184,7 @@ func (s *SimpleBloomController) do(ctx context.Context) error { } -func (s *SimpleBloomController) loadWorkForGap(ctx context.Context, id tsdb.Identifier, gap gapWithBlocks) (v1.CloseableIterator[*v1.Series], []*bloomshipper.ClosableBlockQuerier, error) { +func (s *SimpleBloomController) loadWorkForGap(ctx context.Context, id tsdb.Identifier, gap gapWithBlocks) (v1.CloseableIterator[*v1.Series], []*bloomshipper.CloseableBlockQuerier, error) { // load a series iterator for the gap seriesItr, err := s.tsdbStore.LoadTSDB(id, gap.bounds) if err != nil { @@ -195,7 +195,7 @@ func (s *SimpleBloomController) loadWorkForGap(ctx context.Context, id tsdb.Iden if err != nil { return nil, nil, errors.Wrap(err, "failed to get blocks") } - results := make([]*bloomshipper.ClosableBlockQuerier, 0, len(blocks)) + results := make([]*bloomshipper.CloseableBlockQuerier, 0, len(blocks)) for _, block := range blocks { results = append(results, block.BlockQuerier()) } diff --git a/pkg/bloomcompactor/spec.go b/pkg/bloomcompactor/spec.go index 341e3977502c3..bf9a0a02387b4 100644 --- a/pkg/bloomcompactor/spec.go +++ b/pkg/bloomcompactor/spec.go @@ -74,7 +74,7 @@ type SimpleBloomGenerator struct { chunkLoader ChunkLoader // TODO(owen-d): blocks need not be all downloaded prior. Consider implementing // as an iterator of iterators, where each iterator is a batch of overlapping blocks. - blocks []*bloomshipper.ClosableBlockQuerier + blocks []*bloomshipper.CloseableBlockQuerier // options to build blocks with opts v1.BlockOptions @@ -95,7 +95,7 @@ func NewSimpleBloomGenerator( opts v1.BlockOptions, store v1.Iterator[*v1.Series], chunkLoader ChunkLoader, - blocks []*bloomshipper.ClosableBlockQuerier, + blocks []*bloomshipper.CloseableBlockQuerier, readWriterFn func() (v1.BlockWriter, v1.BlockReader), metrics *Metrics, logger log.Logger, @@ -136,7 +136,7 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []v1 var closeErrors multierror.MultiError blocksMatchingSchema := make([]v1.PeekingIterator[*v1.SeriesWithBloom], 0, len(s.blocks)) - toClose := make([]*bloomshipper.ClosableBlockQuerier, 0, len(s.blocks)) + toClose := make([]*bloomshipper.CloseableBlockQuerier, 0, len(s.blocks)) // Close all remaining blocks on exit defer func() { for _, block := range toClose { diff --git a/pkg/bloomcompactor/spec_test.go b/pkg/bloomcompactor/spec_test.go index efc0d70f2020a..c43a4b715a1e7 100644 --- a/pkg/bloomcompactor/spec_test.go +++ b/pkg/bloomcompactor/spec_test.go @@ -64,9 +64,9 @@ func (dummyChunkLoader) Load(_ context.Context, series *v1.Series) (*ChunkItersB } func dummyBloomGen(opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks []*v1.Block) *SimpleBloomGenerator { - bqs := make([]*bloomshipper.ClosableBlockQuerier, 0, len(blocks)) + bqs := make([]*bloomshipper.CloseableBlockQuerier, 0, len(blocks)) for _, b := range blocks { - bqs = append(bqs, &bloomshipper.ClosableBlockQuerier{ + bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{ BlockQuerier: v1.NewBlockQuerier(b), }) } diff --git a/pkg/bloomgateway/processor_test.go b/pkg/bloomgateway/processor_test.go index 84e3b0af88643..d39ba61a89613 100644 --- a/pkg/bloomgateway/processor_test.go +++ b/pkg/bloomgateway/processor_test.go @@ -20,7 +20,7 @@ var _ bloomshipper.Store = &dummyStore{} type dummyStore struct { metas []bloomshipper.Meta blocks []bloomshipper.BlockRef - querieres []*bloomshipper.ClosableBlockQuerier + querieres []*bloomshipper.CloseableBlockQuerier } func (s *dummyStore) ResolveMetas(_ context.Context, _ bloomshipper.MetaSearchParams) ([][]bloomshipper.MetaRef, []*bloomshipper.Fetcher, error) { @@ -48,8 +48,8 @@ func (s *dummyStore) Client(_ model.Time) (bloomshipper.Client, error) { func (s *dummyStore) Stop() { } -func (s *dummyStore) FetchBlocks(_ context.Context, refs []bloomshipper.BlockRef) ([]*bloomshipper.ClosableBlockQuerier, error) { - result := make([]*bloomshipper.ClosableBlockQuerier, 0, len(s.querieres)) +func (s *dummyStore) FetchBlocks(_ context.Context, refs []bloomshipper.BlockRef) ([]*bloomshipper.CloseableBlockQuerier, error) { + result := make([]*bloomshipper.CloseableBlockQuerier, 0, len(s.querieres)) for _, ref := range refs { for _, bq := range s.querieres { diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index f68562766e83a..5eb9385399d87 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -295,10 +295,10 @@ func TestPartitionRequest(t *testing.T) { } -func createBlockQueriers(t *testing.T, numBlocks int, from, through model.Time, minFp, maxFp model.Fingerprint) ([]*bloomshipper.ClosableBlockQuerier, [][]v1.SeriesWithBloom) { +func createBlockQueriers(t *testing.T, numBlocks int, from, through model.Time, minFp, maxFp model.Fingerprint) ([]*bloomshipper.CloseableBlockQuerier, [][]v1.SeriesWithBloom) { t.Helper() step := (maxFp - minFp) / model.Fingerprint(numBlocks) - bqs := make([]*bloomshipper.ClosableBlockQuerier, 0, numBlocks) + bqs := make([]*bloomshipper.CloseableBlockQuerier, 0, numBlocks) series := make([][]v1.SeriesWithBloom, 0, numBlocks) for i := 0; i < numBlocks; i++ { fromFp := minFp + (step * model.Fingerprint(i)) @@ -308,7 +308,7 @@ func createBlockQueriers(t *testing.T, numBlocks int, from, through model.Time, throughFp = maxFp } blockQuerier, data := v1.MakeBlockQuerier(t, fromFp, throughFp, from, through) - bq := &bloomshipper.ClosableBlockQuerier{ + bq := &bloomshipper.CloseableBlockQuerier{ BlockQuerier: blockQuerier, BlockRef: bloomshipper.BlockRef{ Ref: bloomshipper.Ref{ @@ -324,12 +324,12 @@ func createBlockQueriers(t *testing.T, numBlocks int, from, through model.Time, return bqs, series } -func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, minFp, maxFp model.Fingerprint) ([]bloomshipper.BlockRef, []bloomshipper.Meta, []*bloomshipper.ClosableBlockQuerier, [][]v1.SeriesWithBloom) { +func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, minFp, maxFp model.Fingerprint) ([]bloomshipper.BlockRef, []bloomshipper.Meta, []*bloomshipper.CloseableBlockQuerier, [][]v1.SeriesWithBloom) { t.Helper() blocks := make([]bloomshipper.BlockRef, 0, n) metas := make([]bloomshipper.Meta, 0, n) - queriers := make([]*bloomshipper.ClosableBlockQuerier, 0, n) + queriers := make([]*bloomshipper.CloseableBlockQuerier, 0, n) series := make([][]v1.SeriesWithBloom, 0, n) step := (maxFp - minFp) / model.Fingerprint(n) @@ -358,7 +358,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, Blocks: []bloomshipper.BlockRef{block}, } blockQuerier, data := v1.MakeBlockQuerier(t, fromFp, throughFp, from, through) - querier := &bloomshipper.ClosableBlockQuerier{ + querier := &bloomshipper.CloseableBlockQuerier{ BlockQuerier: blockQuerier, BlockRef: block, } @@ -370,12 +370,12 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, return blocks, metas, queriers, series } -func newMockBloomStore(bqs []*bloomshipper.ClosableBlockQuerier) *mockBloomStore { +func newMockBloomStore(bqs []*bloomshipper.CloseableBlockQuerier) *mockBloomStore { return &mockBloomStore{bqs: bqs} } type mockBloomStore struct { - bqs []*bloomshipper.ClosableBlockQuerier + bqs []*bloomshipper.CloseableBlockQuerier // mock how long it takes to serve block queriers delay time.Duration // mock response error when serving block queriers in ForEach @@ -404,7 +404,7 @@ func (s *mockBloomStore) ForEach(_ context.Context, _ string, _ []bloomshipper.B return s.err } - shuffled := make([]*bloomshipper.ClosableBlockQuerier, len(s.bqs)) + shuffled := make([]*bloomshipper.CloseableBlockQuerier, len(s.bqs)) _ = copy(shuffled, s.bqs) rand.Shuffle(len(shuffled), func(i, j int) { @@ -444,7 +444,7 @@ func createQueryInputFromBlockData(t *testing.T, tenant string, data [][]v1.Seri return res } -func createBlockRefsFromBlockData(t *testing.T, tenant string, data []*bloomshipper.ClosableBlockQuerier) []bloomshipper.BlockRef { +func createBlockRefsFromBlockData(t *testing.T, tenant string, data []*bloomshipper.CloseableBlockQuerier) []bloomshipper.BlockRef { t.Helper() res := make([]bloomshipper.BlockRef, 0) for i := range data { diff --git a/pkg/storage/stores/shipper/bloomshipper/cache.go b/pkg/storage/stores/shipper/bloomshipper/cache.go index 9f6fe361cfd32..52899a03fea0a 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache.go @@ -17,13 +17,13 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" ) -type ClosableBlockQuerier struct { +type CloseableBlockQuerier struct { BlockRef *v1.BlockQuerier close func() error } -func (c *ClosableBlockQuerier) Close() error { +func (c *CloseableBlockQuerier) Close() error { if c.close != nil { return c.close() } @@ -88,9 +88,9 @@ func (b BlockDirectory) Release() error { // BlockQuerier returns a new block querier from the directory. // It increments the counter of active queriers for this directory. // The counter is decreased when the returned querier is closed. -func (b BlockDirectory) BlockQuerier() *ClosableBlockQuerier { +func (b BlockDirectory) BlockQuerier() *CloseableBlockQuerier { b.Acquire() - return &ClosableBlockQuerier{ + return &CloseableBlockQuerier{ BlockQuerier: v1.NewBlockQuerier(b.Block()), BlockRef: b.BlockRef, close: b.Release, diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher.go b/pkg/storage/stores/shipper/bloomshipper/fetcher.go index 31ae1e2a00f00..bb9a70644f5e6 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher.go @@ -21,7 +21,7 @@ type metrics struct{} type fetcher interface { FetchMetas(ctx context.Context, refs []MetaRef) ([]Meta, error) - FetchBlocks(ctx context.Context, refs []BlockRef) ([]*ClosableBlockQuerier, error) + FetchBlocks(ctx context.Context, refs []BlockRef) ([]*CloseableBlockQuerier, error) Close() } @@ -124,7 +124,7 @@ func (f *Fetcher) writeBackMetas(ctx context.Context, metas []Meta) error { return f.metasCache.Store(ctx, keys, data) } -func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef) ([]*ClosableBlockQuerier, error) { +func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef) ([]*CloseableBlockQuerier, error) { n := len(refs) responses := make(chan downloadResponse[BlockDirectory], n) @@ -140,7 +140,7 @@ func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef) ([]*Closable }) } - results := make([]*ClosableBlockQuerier, n) + results := make([]*CloseableBlockQuerier, n) for i := 0; i < n; i++ { select { case err := <-errors: diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go index ccfce5765d754..c95d04122117f 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store.go +++ b/pkg/storage/stores/shipper/bloomshipper/store.go @@ -25,7 +25,7 @@ var ( type Store interface { ResolveMetas(ctx context.Context, params MetaSearchParams) ([][]MetaRef, []*Fetcher, error) FetchMetas(ctx context.Context, params MetaSearchParams) ([]Meta, error) - FetchBlocks(ctx context.Context, refs []BlockRef) ([]*ClosableBlockQuerier, error) + FetchBlocks(ctx context.Context, refs []BlockRef) ([]*CloseableBlockQuerier, error) Fetcher(ts model.Time) (*Fetcher, error) Client(ts model.Time) (Client, error) Stop() @@ -112,7 +112,7 @@ func (b *bloomStoreEntry) FetchMetas(ctx context.Context, params MetaSearchParam } // FetchBlocks implements Store. -func (b *bloomStoreEntry) FetchBlocks(ctx context.Context, refs []BlockRef) ([]*ClosableBlockQuerier, error) { +func (b *bloomStoreEntry) FetchBlocks(ctx context.Context, refs []BlockRef) ([]*CloseableBlockQuerier, error) { return b.fetcher.FetchBlocks(ctx, refs) } @@ -291,7 +291,7 @@ func (b *BloomStore) FetchMetas(ctx context.Context, params MetaSearchParams) ([ } // FetchBlocks implements Store. -func (b *BloomStore) FetchBlocks(ctx context.Context, blocks []BlockRef) ([]*ClosableBlockQuerier, error) { +func (b *BloomStore) FetchBlocks(ctx context.Context, blocks []BlockRef) ([]*CloseableBlockQuerier, error) { var refs [][]BlockRef var fetchers []*Fetcher @@ -316,7 +316,7 @@ func (b *BloomStore) FetchBlocks(ctx context.Context, blocks []BlockRef) ([]*Clo } } - results := make([]*ClosableBlockQuerier, 0, len(blocks)) + results := make([]*CloseableBlockQuerier, 0, len(blocks)) for i := range fetchers { res, err := fetchers[i].FetchBlocks(ctx, refs[i]) results = append(results, res...) @@ -326,7 +326,7 @@ func (b *BloomStore) FetchBlocks(ctx context.Context, blocks []BlockRef) ([]*Clo } // sort responses (results []*CloseableBlockQuerier) based on requests (blocks []BlockRef) - slices.SortFunc(results, func(a, b *ClosableBlockQuerier) int { + slices.SortFunc(results, func(a, b *CloseableBlockQuerier) int { ia, ib := slices.Index(blocks, a.BlockRef), slices.Index(blocks, b.BlockRef) if ia < ib { return -1