Skip to content

Commit

Permalink
[Blooms] Consistent hashing via tokens for bloomcompactor (#12002)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Feb 20, 2024
1 parent eb1379a commit 1662298
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 97 deletions.
2 changes: 1 addition & 1 deletion pkg/bloomcompactor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func newBatchedChunkLoader(
time.Unix(0, 0),
time.Unix(0, math.MaxInt64),
logproto.FORWARD,
logql_log.NewNoopPipeline().ForStream(c.Metric),
logql_log.NewNoopPipeline().ForStream(nil),
)

if err != nil {
Expand Down
138 changes: 121 additions & 17 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package bloomcompactor

import (
"context"
"fmt"
"math"
"slices"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -193,23 +197,120 @@ func (c *Compactor) tenants(ctx context.Context, table config.DayTable) (v1.Iter
}

// ownsTenant returns the ownership range for the tenant, if the compactor owns the tenant, and an error.
func (c *Compactor) ownsTenant(tenant string) (v1.FingerprintBounds, bool, error) {
func (c *Compactor) ownsTenant(tenant string) ([]v1.FingerprintBounds, bool, error) {
tenantRing, owned := c.sharding.OwnsTenant(tenant)
if !owned {
return v1.FingerprintBounds{}, false, nil
return nil, false, nil
}

// TOOD(owen-d): use <ReadRing>.GetTokenRangesForInstance()
// when it's supported for non zone-aware rings
// instead of doing all this manually

rs, err := tenantRing.GetAllHealthy(RingOp)
if err != nil {
return v1.FingerprintBounds{}, false, errors.Wrap(err, "getting ring healthy instances")

return nil, false, errors.Wrap(err, "getting ring healthy instances")
}

keyRange, err := bloomutils.KeyRangeForInstance(c.cfg.Ring.InstanceID, rs.Instances, bloomutils.Uint64Range)
ranges, err := tokenRangesForInstance(c.cfg.Ring.InstanceID, rs.Instances)
if err != nil {
return v1.FingerprintBounds{}, false, errors.Wrap(err, "getting instance token range")
return nil, false, errors.Wrap(err, "getting token ranges for instance")
}
return v1.NewBounds(model.Fingerprint(keyRange.Min), model.Fingerprint(keyRange.Max)), true, nil

keyspaces := bloomutils.KeyspacesFromTokenRanges(ranges)
return keyspaces, true, nil
}

func tokenRangesForInstance(id string, instances []ring.InstanceDesc) (ranges ring.TokenRanges, err error) {
var ownedTokens map[uint32]struct{}

// lifted from grafana/dskit/ring/model.go <*Desc>.GetTokens()
toks := make([][]uint32, 0, len(instances))
for _, instance := range instances {
if instance.Id == id {
ranges = make(ring.TokenRanges, 0, 2*(len(instance.Tokens)+1))
ownedTokens = make(map[uint32]struct{}, len(instance.Tokens))
for _, tok := range instance.Tokens {
ownedTokens[tok] = struct{}{}
}
}

// Tokens may not be sorted for an older version which, so we enforce sorting here.
tokens := instance.Tokens
if !sort.IsSorted(ring.Tokens(tokens)) {
sort.Sort(ring.Tokens(tokens))
}

toks = append(toks, tokens)
}

if cap(ranges) == 0 {
return nil, fmt.Errorf("instance %s not found", id)
}

allTokens := ring.MergeTokens(toks)
if len(allTokens) == 0 {
return nil, errors.New("no tokens in the ring")
}

// mostly lifted from grafana/dskit/ring/token_range.go <*Ring>.GetTokenRangesForInstance()

// non-zero value means we're now looking for start of the range. Zero value means we're looking for next end of range (ie. token owned by this instance).
rangeEnd := uint32(0)

// if this instance claimed the first token, it owns the wrap-around range, which we'll break into two separate ranges
firstToken := allTokens[0]
_, ownsFirstToken := ownedTokens[firstToken]

if ownsFirstToken {
// we'll start by looking for the beginning of the range that ends with math.MaxUint32
rangeEnd = math.MaxUint32
}

// walk the ring backwards, alternating looking for ends and starts of ranges
for i := len(allTokens) - 1; i > 0; i-- {
token := allTokens[i]
_, owned := ownedTokens[token]

if rangeEnd == 0 {
// we're looking for the end of the next range
if owned {
rangeEnd = token - 1
}
} else {
// we have a range end, and are looking for the start of the range
if !owned {
ranges = append(ranges, rangeEnd, token)
rangeEnd = 0
}
}
}

// finally look at the first token again
// - if we have a range end, check if we claimed token 0
// - if we don't, we have our start
// - if we do, the start is 0
// - if we don't have a range end, check if we claimed token 0
// - if we don't, do nothing
// - if we do, add the range of [0, token-1]
// - BUT, if the token itself is 0, do nothing, because we don't own the tokens themselves (we should be covered by the already added range that ends with MaxUint32)

if rangeEnd == 0 {
if ownsFirstToken && firstToken != 0 {
ranges = append(ranges, firstToken-1, 0)
}
} else {
if ownsFirstToken {
ranges = append(ranges, rangeEnd, 0)
} else {
ranges = append(ranges, rangeEnd, firstToken)
}
}

// Ensure returned ranges are sorted.
slices.Sort(ranges)

return ranges, nil
}

// runs a single round of compaction for all relevant tenants and tables
Expand Down Expand Up @@ -266,25 +367,28 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
for tenants.Next() && tenants.Err() == nil && ctx.Err() == nil {
c.metrics.tenantsDiscovered.Inc()
tenant := tenants.At()
ownershipRange, owns, err := c.ownsTenant(tenant)
ownershipRanges, owns, err := c.ownsTenant(tenant)
if err != nil {
return errors.Wrap(err, "checking tenant ownership")
}
level.Debug(c.logger).Log("msg", "enqueueing work for tenant", "tenant", tenant, "table", table, "ownership", ownershipRange.String(), "owns", owns)
level.Debug(c.logger).Log("msg", "enqueueing work for tenant", "tenant", tenant, "table", table, "ranges", len(ownershipRanges), "owns", owns)
if !owns {
c.metrics.tenantsSkipped.Inc()
continue
}
c.metrics.tenantsOwned.Inc()

select {
case ch <- tenantTable{
tenant: tenant,
table: table,
ownershipRange: ownershipRange,
}:
case <-ctx.Done():
return ctx.Err()
for _, ownershipRange := range ownershipRanges {

select {
case ch <- tenantTable{
tenant: tenant,
table: table,
ownershipRange: ownershipRange,
}:
case <-ctx.Done():
return ctx.Err()
}
}
}

Expand Down
81 changes: 74 additions & 7 deletions pkg/bloomcompactor/bloomcompactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -113,7 +114,7 @@ func TestCompactor_ownsTenant(t *testing.T) {
require.NoError(t, err)
if ownsTenant {
compactorOwnsTenant++
compactorOwnershipRange = append(compactorOwnershipRange, ownershipRange)
compactorOwnershipRange = append(compactorOwnershipRange, ownershipRange...)
}
}
require.Equal(t, tc.expectedCompactorsOwningTenant, compactorOwnsTenant)
Expand All @@ -135,12 +136,6 @@ func TestCompactor_ownsTenant(t *testing.T) {
coveredKeySpace.Max = boundsA.Max
}

// Assert that the fingerprint key-space is evenly distributed across the compactors
// We do some adjustments if the key-space is not evenly distributable, so we use a delta of 10
// to account for that and check that the key-space is reasonably evenly distributed.
fpPerTenant := math.MaxUint64 / uint64(tc.expectedCompactorsOwningTenant)
boundsLen := uint64(boundsA.Max - boundsA.Min)
require.InDelta(t, fpPerTenant, boundsLen, 10)
}
// Assert that the fingerprint key-space is complete
require.True(t, coveredKeySpace.Equal(v1.NewBounds(0, math.MaxUint64)))
Expand Down Expand Up @@ -195,3 +190,75 @@ func (m mockLimits) BloomFalsePositiveRate(_ string) float64 {
func (m mockLimits) BloomCompactorMaxBlockSize(_ string) int {
panic("implement me")
}

func TestTokenRangesForInstance(t *testing.T) {
desc := func(id int, tokens ...uint32) ring.InstanceDesc {
return ring.InstanceDesc{Id: fmt.Sprintf("%d", id), Tokens: tokens}
}

tests := map[string]struct {
input []ring.InstanceDesc
exp map[string]ring.TokenRanges
err bool
}{
"no nodes": {
input: []ring.InstanceDesc{},
exp: map[string]ring.TokenRanges{
"0": {0, math.MaxUint32}, // have to put one in here to trigger test
},
err: true,
},
"one node": {
input: []ring.InstanceDesc{
desc(0, 0, 100),
},
exp: map[string]ring.TokenRanges{
"0": {0, math.MaxUint32},
},
},
"two nodes": {
input: []ring.InstanceDesc{
desc(0, 25, 75),
desc(1, 10, 50, 100),
},
exp: map[string]ring.TokenRanges{
"0": {10, 24, 50, 74},
"1": {0, 9, 25, 49, 75, math.MaxUint32},
},
},
"consecutive tokens": {
input: []ring.InstanceDesc{
desc(0, 99),
desc(1, 100),
},
exp: map[string]ring.TokenRanges{
"0": {0, 98, 100, math.MaxUint32},
"1": {99, 99},
},
},
"extremes": {
input: []ring.InstanceDesc{
desc(0, 0),
desc(1, math.MaxUint32),
},
exp: map[string]ring.TokenRanges{
"0": {math.MaxUint32, math.MaxUint32},
"1": {0, math.MaxUint32 - 1},
},
},
}

for desc, test := range tests {
t.Run(desc, func(t *testing.T) {
for id := range test.exp {
ranges, err := tokenRangesForInstance(id, test.input)
if test.err {
require.Error(t, err)
continue
}
require.NoError(t, err)
require.Equal(t, test.exp[id], ranges)
}
})
}
}
49 changes: 10 additions & 39 deletions pkg/bloomutils/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
package bloomutils

import (
"errors"
"fmt"
"math"
"sort"

"github.com/grafana/dskit/ring"
"github.com/prometheus/common/model"
"golang.org/x/exp/constraints"
"golang.org/x/exp/slices"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)
Expand Down Expand Up @@ -72,44 +71,16 @@ func (i InstancesWithTokenRange) Contains(token uint32) bool {
return false
}

// KeyRangeForInstance calculates the token range for a specific instance
// with given id based on the first token in the ring.
// This assumes that each instance in the ring is configured with only a single
// token.
func KeyRangeForInstance[T constraints.Unsigned](id string, instances []ring.InstanceDesc, keyspace Range[T]) (Range[T], error) {

// Sort instances -- they may not be sorted
// because they're usually accessed by looking up the tokens (which are sorted)
sort.Slice(instances, func(i, j int) bool {
return instances[i].Tokens[0] < instances[j].Tokens[0]
})

idx := slices.IndexFunc(instances, func(inst ring.InstanceDesc) bool {
return inst.Id == id
})

// instance with Id == id not found
if idx == -1 {
return Range[T]{}, ring.ErrInstanceNotFound
// TODO(owen-d): use https://github.com/grafana/loki/pull/11975 after merge
func KeyspacesFromTokenRanges(tokenRanges ring.TokenRanges) []v1.FingerprintBounds {
keyspaces := make([]v1.FingerprintBounds, 0, len(tokenRanges)/2)
for i := 0; i < len(tokenRanges)-1; i += 2 {
keyspaces = append(keyspaces, v1.FingerprintBounds{
Min: model.Fingerprint(tokenRanges[i]) << 32,
Max: model.Fingerprint(tokenRanges[i+1])<<32 | model.Fingerprint(math.MaxUint32),
})
}

diff := keyspace.Max - keyspace.Min
i := T(idx)
n := T(len(instances))

if diff < n {
return Range[T]{}, errors.New("keyspace is smaller than amount of instances")
}

step := diff / n
min := step * i
max := step*i + step - 1
if i == n-1 {
// extend the last token tange to MaxUint32
max = (keyspace.Max - keyspace.Min)
}

return Range[T]{min, max}, nil
return keyspaces
}

// NewInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements
Expand Down
Loading

0 comments on commit 1662298

Please sign in to comment.