Skip to content

Commit

Permalink
Merge branch 'main' into tpatterson/compactor-resource-limtis
Browse files Browse the repository at this point in the history
  • Loading branch information
MasslessParticle authored Feb 16, 2024
2 parents 1f17ea2 + ef40136 commit 5f55e24
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 46 deletions.
38 changes: 28 additions & 10 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,11 @@ func runWithRetries(

type tenantTable struct {
tenant string
table config.DayTime
table config.DayTable
ownershipRange v1.FingerprintBounds
}

func (c *Compactor) tenants(ctx context.Context, table config.DayTime) (v1.Iterator[string], error) {
func (c *Compactor) tenants(ctx context.Context, table config.DayTable) (v1.Iterator[string], error) {
tenants, err := c.tsdbStore.UsersForPeriod(ctx, table)
if err != nil {
return nil, errors.Wrap(err, "getting tenants")
Expand Down Expand Up @@ -241,15 +241,15 @@ func (c *Compactor) tables(ts time.Time) *dayRangeIterator {

fromDay := config.NewDayTime(model.TimeFromUnixNano(from))
throughDay := config.NewDayTime(model.TimeFromUnixNano(through))
return newDayRangeIterator(fromDay, throughDay)
return newDayRangeIterator(fromDay, throughDay, c.schemaCfg)
}

func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
tables := c.tables(time.Now())

for tables.Next() && tables.Err() == nil && ctx.Err() == nil {

table := tables.At()

tenants, err := c.tenants(ctx, table)
if err != nil {
return errors.Wrap(err, "getting tenants")
Expand All @@ -269,7 +269,11 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
c.metrics.tenantsOwned.Inc()

select {
case ch <- tenantTable{tenant: tenant, table: table, ownershipRange: ownershipRange}:
case ch <- tenantTable{
tenant: tenant,
table: table,
ownershipRange: ownershipRange,
}:
case <-ctx.Done():
return ctx.Err()
}
Expand Down Expand Up @@ -332,19 +336,33 @@ func (c *Compactor) compactTenantTable(ctx context.Context, tt tenantTable) erro

type dayRangeIterator struct {
min, max, cur config.DayTime
curPeriod config.PeriodConfig
schemaCfg config.SchemaConfig
err error
}

func newDayRangeIterator(min, max config.DayTime) *dayRangeIterator {
return &dayRangeIterator{min: min, max: max, cur: min.Dec()}
func newDayRangeIterator(min, max config.DayTime, schemaCfg config.SchemaConfig) *dayRangeIterator {
return &dayRangeIterator{min: min, max: max, cur: min.Dec(), schemaCfg: schemaCfg}
}

func (r *dayRangeIterator) Next() bool {
r.cur = r.cur.Inc()
return r.cur.Before(r.max)
if !r.cur.Before(r.max) {
return false
}

period, err := r.schemaCfg.SchemaForTime(r.cur.ModelTime())
if err != nil {
r.err = errors.Wrapf(err, "getting schema for time (%s)", r.cur)
return false
}
r.curPeriod = period

return true
}

func (r *dayRangeIterator) At() config.DayTime {
return r.cur
func (r *dayRangeIterator) At() config.DayTable {
return config.NewDayTable(r.cur, r.curPeriod.IndexTables.Prefix)
}

func (r *dayRangeIterator) Err() error {
Expand Down
12 changes: 6 additions & 6 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ Compaction works as follows, split across many functions for clarity:
*/
func (s *SimpleBloomController) compactTenant(
ctx context.Context,
table config.DayTime,
table config.DayTable,
tenant string,
ownershipRange v1.FingerprintBounds,
) error {
logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table)
logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table.Addr())

client, err := s.bloomStore.Client(table.ModelTime())
if err != nil {
level.Error(logger).Log("msg", "failed to get client", "err", err, "table", table.Addr())
level.Error(logger).Log("msg", "failed to get client", "err", err)
return errors.Wrap(err, "failed to get client")
}

Expand Down Expand Up @@ -175,7 +175,7 @@ func (s *SimpleBloomController) compactTenant(
func (s *SimpleBloomController) findOutdatedGaps(
ctx context.Context,
tenant string,
table config.DayTime,
table config.DayTable,
ownershipRange v1.FingerprintBounds,
metas []bloomshipper.Meta,
logger log.Logger,
Expand Down Expand Up @@ -215,7 +215,7 @@ func (s *SimpleBloomController) findOutdatedGaps(

func (s *SimpleBloomController) loadWorkForGap(
ctx context.Context,
table config.DayTime,
table config.DayTable,
tenant string,
id tsdb.Identifier,
gap gapWithBlocks,
Expand All @@ -241,7 +241,7 @@ func (s *SimpleBloomController) loadWorkForGap(
func (s *SimpleBloomController) buildGaps(
ctx context.Context,
tenant string,
table config.DayTime,
table config.DayTable,
client bloomshipper.Client,
work []blockPlan,
logger log.Logger,
Expand Down
24 changes: 12 additions & 12 deletions pkg/bloomcompactor/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ const (
)

type TSDBStore interface {
UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error)
ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error)
ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error)
LoadTSDB(
ctx context.Context,
table config.DayTime,
table config.DayTable,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
Expand All @@ -49,12 +49,12 @@ func NewBloomTSDBStore(storage storage.Client) *BloomTSDBStore {
}
}

func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) {
func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) {
_, users, err := b.storage.ListFiles(ctx, table.Addr(), true) // bypass cache for ease of testing
return users, err
}

func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
indices, err := b.storage.ListUserFiles(ctx, table.Addr(), tenant, true) // bypass cache for ease of testing
if err != nil {
return nil, errors.Wrap(err, "failed to list user files")
Expand All @@ -80,7 +80,7 @@ func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime,

func (b *BloomTSDBStore) LoadTSDB(
ctx context.Context,
table config.DayTime,
table config.DayTable,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
Expand Down Expand Up @@ -272,17 +272,17 @@ func (s *TSDBStores) storeForPeriod(table config.DayTime) (TSDBStore, error) {
)
}

func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) {
store, err := s.storeForPeriod(table)
func (s *TSDBStores) UsersForPeriod(ctx context.Context, table config.DayTable) ([]string, error) {
store, err := s.storeForPeriod(table.DayTime)
if err != nil {
return nil, err
}

return store.UsersForPeriod(ctx, table)
}

func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
store, err := s.storeForPeriod(table)
func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
store, err := s.storeForPeriod(table.DayTime)
if err != nil {
return nil, err
}
Expand All @@ -292,12 +292,12 @@ func (s *TSDBStores) ResolveTSDBs(ctx context.Context, table config.DayTime, ten

func (s *TSDBStores) LoadTSDB(
ctx context.Context,
table config.DayTime,
table config.DayTable,
tenant string,
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (v1.CloseableIterator[*v1.Series], error) {
store, err := s.storeForPeriod(table)
store, err := s.storeForPeriod(table.DayTime)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
}
ref := bloomshipper.Ref{
TenantID: tenant,
TableName: config.NewDayTime(truncateDay(from)).Addr(),
TableName: config.NewDayTable(config.NewDayTime(truncateDay(from)), "").Addr(),
Bounds: v1.NewBounds(fromFp, throughFp),
StartTimestamp: from,
EndTimestamp: through,
Expand Down
41 changes: 24 additions & 17 deletions pkg/storage/config/schema_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,6 @@ func (cfg *PeriodConfig) GetIndexTableNumberRange(schemaEndDate DayTime) TableRa
}
}

func (cfg *PeriodConfig) GetFullTableName(t model.Time) string {
return NewDayTime(t).TableWithPrefix(cfg)
}

func NewDayTime(d model.Time) DayTime {
return DayTime{d}
}
Expand Down Expand Up @@ -237,19 +233,6 @@ func (d DayTime) String() string {
return d.Time.Time().UTC().Format("2006-01-02")
}

// Addr returns the unix day offset as a string, which is used
// as the address for the index table in storage.
func (d DayTime) Addr() string {
return fmt.Sprintf("%d",
d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod))
}

func (d DayTime) TableWithPrefix(cfg *PeriodConfig) string {
return fmt.Sprintf("%s%d",
cfg.IndexTables.Prefix,
d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod))
}

func (d DayTime) Inc() DayTime {
return DayTime{d.Add(ObjectStorageIndexRequiredPeriod)}
}
Expand All @@ -274,6 +257,30 @@ func (d DayTime) Bounds() (model.Time, model.Time) {
return d.Time, d.Inc().Time
}

type DayTable struct {
DayTime
Prefix string
}

func (d DayTable) String() string {
return d.Addr()
}

func NewDayTable(d DayTime, prefix string) DayTable {
return DayTable{
DayTime: d,
Prefix: prefix,
}
}

// Addr returns the prefix (if any) and the unix day offset as a string, which is used
// as the address for the index table in storage.
func (d DayTable) Addr() string {
return fmt.Sprintf("%s%d",
d.Prefix,
d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod))
}

// SchemaConfig contains the config for our chunk index schemas
type SchemaConfig struct {
Configs []PeriodConfig `yaml:"configs"`
Expand Down

0 comments on commit 5f55e24

Please sign in to comment.