Skip to content

Commit

Permalink
Rename ClosableBlockQuerier -> CloseableBlockQuerier
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Feb 6, 2024
1 parent 59d70e1 commit e83e092
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 32 deletions.
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (s *SimpleBloomController) do(ctx context.Context) error {

}

func (s *SimpleBloomController) loadWorkForGap(ctx context.Context, id tsdb.Identifier, gap gapWithBlocks) (v1.CloseableIterator[*v1.Series], []*bloomshipper.ClosableBlockQuerier, error) {
func (s *SimpleBloomController) loadWorkForGap(ctx context.Context, id tsdb.Identifier, gap gapWithBlocks) (v1.CloseableIterator[*v1.Series], []*bloomshipper.CloseableBlockQuerier, error) {
// load a series iterator for the gap
seriesItr, err := s.tsdbStore.LoadTSDB(id, gap.bounds)
if err != nil {
Expand All @@ -195,7 +195,7 @@ func (s *SimpleBloomController) loadWorkForGap(ctx context.Context, id tsdb.Iden
if err != nil {
return nil, nil, errors.Wrap(err, "failed to get blocks")
}
results := make([]*bloomshipper.ClosableBlockQuerier, 0, len(blocks))
results := make([]*bloomshipper.CloseableBlockQuerier, 0, len(blocks))
for _, block := range blocks {
results = append(results, block.BlockQuerier())
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomcompactor/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type SimpleBloomGenerator struct {
chunkLoader ChunkLoader
// TODO(owen-d): blocks need not be all downloaded prior. Consider implementing
// as an iterator of iterators, where each iterator is a batch of overlapping blocks.
blocks []*bloomshipper.ClosableBlockQuerier
blocks []*bloomshipper.CloseableBlockQuerier

// options to build blocks with
opts v1.BlockOptions
Expand All @@ -95,7 +95,7 @@ func NewSimpleBloomGenerator(
opts v1.BlockOptions,
store v1.Iterator[*v1.Series],
chunkLoader ChunkLoader,
blocks []*bloomshipper.ClosableBlockQuerier,
blocks []*bloomshipper.CloseableBlockQuerier,
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
metrics *Metrics,
logger log.Logger,
Expand Down Expand Up @@ -136,7 +136,7 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) (skippedBlocks []v1

var closeErrors multierror.MultiError
blocksMatchingSchema := make([]v1.PeekingIterator[*v1.SeriesWithBloom], 0, len(s.blocks))
toClose := make([]*bloomshipper.ClosableBlockQuerier, 0, len(s.blocks))
toClose := make([]*bloomshipper.CloseableBlockQuerier, 0, len(s.blocks))
// Close all remaining blocks on exit
defer func() {
for _, block := range toClose {
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func (dummyChunkLoader) Load(_ context.Context, series *v1.Series) (*ChunkItersB
}

func dummyBloomGen(opts v1.BlockOptions, store v1.Iterator[*v1.Series], blocks []*v1.Block) *SimpleBloomGenerator {
bqs := make([]*bloomshipper.ClosableBlockQuerier, 0, len(blocks))
bqs := make([]*bloomshipper.CloseableBlockQuerier, 0, len(blocks))
for _, b := range blocks {
bqs = append(bqs, &bloomshipper.ClosableBlockQuerier{
bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(b),
})
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var _ bloomshipper.Store = &dummyStore{}
type dummyStore struct {
metas []bloomshipper.Meta
blocks []bloomshipper.BlockRef
querieres []*bloomshipper.ClosableBlockQuerier
querieres []*bloomshipper.CloseableBlockQuerier
}

func (s *dummyStore) ResolveMetas(_ context.Context, _ bloomshipper.MetaSearchParams) ([][]bloomshipper.MetaRef, []*bloomshipper.Fetcher, error) {
Expand Down Expand Up @@ -48,8 +48,8 @@ func (s *dummyStore) Client(_ model.Time) (bloomshipper.Client, error) {
func (s *dummyStore) Stop() {
}

func (s *dummyStore) FetchBlocks(_ context.Context, refs []bloomshipper.BlockRef) ([]*bloomshipper.ClosableBlockQuerier, error) {
result := make([]*bloomshipper.ClosableBlockQuerier, 0, len(s.querieres))
func (s *dummyStore) FetchBlocks(_ context.Context, refs []bloomshipper.BlockRef) ([]*bloomshipper.CloseableBlockQuerier, error) {
result := make([]*bloomshipper.CloseableBlockQuerier, 0, len(s.querieres))

for _, ref := range refs {
for _, bq := range s.querieres {
Expand Down
20 changes: 10 additions & 10 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,10 @@ func TestPartitionRequest(t *testing.T) {

}

func createBlockQueriers(t *testing.T, numBlocks int, from, through model.Time, minFp, maxFp model.Fingerprint) ([]*bloomshipper.ClosableBlockQuerier, [][]v1.SeriesWithBloom) {
func createBlockQueriers(t *testing.T, numBlocks int, from, through model.Time, minFp, maxFp model.Fingerprint) ([]*bloomshipper.CloseableBlockQuerier, [][]v1.SeriesWithBloom) {
t.Helper()
step := (maxFp - minFp) / model.Fingerprint(numBlocks)
bqs := make([]*bloomshipper.ClosableBlockQuerier, 0, numBlocks)
bqs := make([]*bloomshipper.CloseableBlockQuerier, 0, numBlocks)
series := make([][]v1.SeriesWithBloom, 0, numBlocks)
for i := 0; i < numBlocks; i++ {
fromFp := minFp + (step * model.Fingerprint(i))
Expand All @@ -308,7 +308,7 @@ func createBlockQueriers(t *testing.T, numBlocks int, from, through model.Time,
throughFp = maxFp
}
blockQuerier, data := v1.MakeBlockQuerier(t, fromFp, throughFp, from, through)
bq := &bloomshipper.ClosableBlockQuerier{
bq := &bloomshipper.CloseableBlockQuerier{
BlockQuerier: blockQuerier,
BlockRef: bloomshipper.BlockRef{
Ref: bloomshipper.Ref{
Expand All @@ -324,12 +324,12 @@ func createBlockQueriers(t *testing.T, numBlocks int, from, through model.Time,
return bqs, series
}

func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, minFp, maxFp model.Fingerprint) ([]bloomshipper.BlockRef, []bloomshipper.Meta, []*bloomshipper.ClosableBlockQuerier, [][]v1.SeriesWithBloom) {
func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, minFp, maxFp model.Fingerprint) ([]bloomshipper.BlockRef, []bloomshipper.Meta, []*bloomshipper.CloseableBlockQuerier, [][]v1.SeriesWithBloom) {
t.Helper()

blocks := make([]bloomshipper.BlockRef, 0, n)
metas := make([]bloomshipper.Meta, 0, n)
queriers := make([]*bloomshipper.ClosableBlockQuerier, 0, n)
queriers := make([]*bloomshipper.CloseableBlockQuerier, 0, n)
series := make([][]v1.SeriesWithBloom, 0, n)

step := (maxFp - minFp) / model.Fingerprint(n)
Expand Down Expand Up @@ -358,7 +358,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
Blocks: []bloomshipper.BlockRef{block},
}
blockQuerier, data := v1.MakeBlockQuerier(t, fromFp, throughFp, from, through)
querier := &bloomshipper.ClosableBlockQuerier{
querier := &bloomshipper.CloseableBlockQuerier{
BlockQuerier: blockQuerier,
BlockRef: block,
}
Expand All @@ -370,12 +370,12 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
return blocks, metas, queriers, series
}

func newMockBloomStore(bqs []*bloomshipper.ClosableBlockQuerier) *mockBloomStore {
func newMockBloomStore(bqs []*bloomshipper.CloseableBlockQuerier) *mockBloomStore {
return &mockBloomStore{bqs: bqs}
}

type mockBloomStore struct {
bqs []*bloomshipper.ClosableBlockQuerier
bqs []*bloomshipper.CloseableBlockQuerier
// mock how long it takes to serve block queriers
delay time.Duration
// mock response error when serving block queriers in ForEach
Expand Down Expand Up @@ -404,7 +404,7 @@ func (s *mockBloomStore) ForEach(_ context.Context, _ string, _ []bloomshipper.B
return s.err
}

shuffled := make([]*bloomshipper.ClosableBlockQuerier, len(s.bqs))
shuffled := make([]*bloomshipper.CloseableBlockQuerier, len(s.bqs))
_ = copy(shuffled, s.bqs)

rand.Shuffle(len(shuffled), func(i, j int) {
Expand Down Expand Up @@ -444,7 +444,7 @@ func createQueryInputFromBlockData(t *testing.T, tenant string, data [][]v1.Seri
return res
}

func createBlockRefsFromBlockData(t *testing.T, tenant string, data []*bloomshipper.ClosableBlockQuerier) []bloomshipper.BlockRef {
func createBlockRefsFromBlockData(t *testing.T, tenant string, data []*bloomshipper.CloseableBlockQuerier) []bloomshipper.BlockRef {
t.Helper()
res := make([]bloomshipper.BlockRef, 0)
for i := range data {
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config"
)

type ClosableBlockQuerier struct {
type CloseableBlockQuerier struct {
BlockRef
*v1.BlockQuerier
close func() error
}

func (c *ClosableBlockQuerier) Close() error {
func (c *CloseableBlockQuerier) Close() error {
if c.close != nil {
return c.close()
}
Expand Down Expand Up @@ -88,9 +88,9 @@ func (b BlockDirectory) Release() error {
// BlockQuerier returns a new block querier from the directory.
// It increments the counter of active queriers for this directory.
// The counter is decreased when the returned querier is closed.
func (b BlockDirectory) BlockQuerier() *ClosableBlockQuerier {
func (b BlockDirectory) BlockQuerier() *CloseableBlockQuerier {
b.Acquire()
return &ClosableBlockQuerier{
return &CloseableBlockQuerier{
BlockQuerier: v1.NewBlockQuerier(b.Block()),
BlockRef: b.BlockRef,
close: b.Release,
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/stores/shipper/bloomshipper/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type metrics struct{}

type fetcher interface {
FetchMetas(ctx context.Context, refs []MetaRef) ([]Meta, error)
FetchBlocks(ctx context.Context, refs []BlockRef) ([]*ClosableBlockQuerier, error)
FetchBlocks(ctx context.Context, refs []BlockRef) ([]*CloseableBlockQuerier, error)
Close()
}

Expand Down Expand Up @@ -124,7 +124,7 @@ func (f *Fetcher) writeBackMetas(ctx context.Context, metas []Meta) error {
return f.metasCache.Store(ctx, keys, data)
}

func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef) ([]*ClosableBlockQuerier, error) {
func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef) ([]*CloseableBlockQuerier, error) {
n := len(refs)

responses := make(chan downloadResponse[BlockDirectory], n)
Expand All @@ -140,7 +140,7 @@ func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef) ([]*Closable
})
}

results := make([]*ClosableBlockQuerier, n)
results := make([]*CloseableBlockQuerier, n)
for i := 0; i < n; i++ {
select {
case err := <-errors:
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/stores/shipper/bloomshipper/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (
type Store interface {
ResolveMetas(ctx context.Context, params MetaSearchParams) ([][]MetaRef, []*Fetcher, error)
FetchMetas(ctx context.Context, params MetaSearchParams) ([]Meta, error)
FetchBlocks(ctx context.Context, refs []BlockRef) ([]*ClosableBlockQuerier, error)
FetchBlocks(ctx context.Context, refs []BlockRef) ([]*CloseableBlockQuerier, error)
Fetcher(ts model.Time) (*Fetcher, error)
Client(ts model.Time) (Client, error)
Stop()
Expand Down Expand Up @@ -112,7 +112,7 @@ func (b *bloomStoreEntry) FetchMetas(ctx context.Context, params MetaSearchParam
}

// FetchBlocks implements Store.
func (b *bloomStoreEntry) FetchBlocks(ctx context.Context, refs []BlockRef) ([]*ClosableBlockQuerier, error) {
func (b *bloomStoreEntry) FetchBlocks(ctx context.Context, refs []BlockRef) ([]*CloseableBlockQuerier, error) {
return b.fetcher.FetchBlocks(ctx, refs)
}

Expand Down Expand Up @@ -291,7 +291,7 @@ func (b *BloomStore) FetchMetas(ctx context.Context, params MetaSearchParams) ([
}

// FetchBlocks implements Store.
func (b *BloomStore) FetchBlocks(ctx context.Context, blocks []BlockRef) ([]*ClosableBlockQuerier, error) {
func (b *BloomStore) FetchBlocks(ctx context.Context, blocks []BlockRef) ([]*CloseableBlockQuerier, error) {

var refs [][]BlockRef
var fetchers []*Fetcher
Expand All @@ -316,7 +316,7 @@ func (b *BloomStore) FetchBlocks(ctx context.Context, blocks []BlockRef) ([]*Clo
}
}

results := make([]*ClosableBlockQuerier, 0, len(blocks))
results := make([]*CloseableBlockQuerier, 0, len(blocks))
for i := range fetchers {
res, err := fetchers[i].FetchBlocks(ctx, refs[i])
results = append(results, res...)
Expand All @@ -326,7 +326,7 @@ func (b *BloomStore) FetchBlocks(ctx context.Context, blocks []BlockRef) ([]*Clo
}

// sort responses (results []*CloseableBlockQuerier) based on requests (blocks []BlockRef)
slices.SortFunc(results, func(a, b *ClosableBlockQuerier) int {
slices.SortFunc(results, func(a, b *CloseableBlockQuerier) int {
ia, ib := slices.Index(blocks, a.BlockRef), slices.Index(blocks, b.BlockRef)
if ia < ib {
return -1
Expand Down

0 comments on commit e83e092

Please sign in to comment.