Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Blooms] Consistent hashing via tokens for bloomcompactor #12002

Merged
merged 4 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/bloomcompactor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func newBatchedChunkLoader(
time.Unix(0, 0),
time.Unix(0, math.MaxInt64),
logproto.FORWARD,
logql_log.NewNoopPipeline().ForStream(c.Metric),
logql_log.NewNoopPipeline().ForStream(nil),
)

if err != nil {
Expand Down
138 changes: 121 additions & 17 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package bloomcompactor

import (
"context"
"fmt"
"math"
"slices"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -193,23 +197,120 @@ func (c *Compactor) tenants(ctx context.Context, table config.DayTable) (v1.Iter
}

// ownsTenant returns the ownership range for the tenant, if the compactor owns the tenant, and an error.
func (c *Compactor) ownsTenant(tenant string) (v1.FingerprintBounds, bool, error) {
func (c *Compactor) ownsTenant(tenant string) ([]v1.FingerprintBounds, bool, error) {
tenantRing, owned := c.sharding.OwnsTenant(tenant)
if !owned {
return v1.FingerprintBounds{}, false, nil
return nil, false, nil
}

// TOOD(owen-d): use <ReadRing>.GetTokenRangesForInstance()
// when it's supported for non zone-aware rings
// instead of doing all this manually

rs, err := tenantRing.GetAllHealthy(RingOp)
if err != nil {
return v1.FingerprintBounds{}, false, errors.Wrap(err, "getting ring healthy instances")

return nil, false, errors.Wrap(err, "getting ring healthy instances")
}

keyRange, err := bloomutils.KeyRangeForInstance(c.cfg.Ring.InstanceID, rs.Instances, bloomutils.Uint64Range)
ranges, err := tokenRangesForInstance(c.cfg.Ring.InstanceID, rs.Instances)
if err != nil {
return v1.FingerprintBounds{}, false, errors.Wrap(err, "getting instance token range")
return nil, false, errors.Wrap(err, "getting token ranges for instance")
}
return v1.NewBounds(model.Fingerprint(keyRange.Min), model.Fingerprint(keyRange.Max)), true, nil

keyspaces := bloomutils.KeyspacesFromTokenRanges(ranges)
return keyspaces, true, nil
}

func tokenRangesForInstance(id string, instances []ring.InstanceDesc) (ranges ring.TokenRanges, err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we eventually move that back into dskit?

var ownedTokens map[uint32]struct{}

// lifted from grafana/dskit/ring/model.go <*Desc>.GetTokens()
toks := make([][]uint32, 0, len(instances))
for _, instance := range instances {
if instance.Id == id {
ranges = make(ring.TokenRanges, 0, 2*(len(instance.Tokens)+1))
ownedTokens = make(map[uint32]struct{}, len(instance.Tokens))
for _, tok := range instance.Tokens {
ownedTokens[tok] = struct{}{}
}
}

// Tokens may not be sorted for an older version which, so we enforce sorting here.
tokens := instance.Tokens
if !sort.IsSorted(ring.Tokens(tokens)) {
sort.Sort(ring.Tokens(tokens))
}

toks = append(toks, tokens)
}

if cap(ranges) == 0 {
return nil, fmt.Errorf("instance %s not found", id)
}

allTokens := ring.MergeTokens(toks)
if len(allTokens) == 0 {
return nil, errors.New("no tokens in the ring")
}

// mostly lifted from grafana/dskit/ring/token_range.go <*Ring>.GetTokenRangesForInstance()

// non-zero value means we're now looking for start of the range. Zero value means we're looking for next end of range (ie. token owned by this instance).
rangeEnd := uint32(0)

// if this instance claimed the first token, it owns the wrap-around range, which we'll break into two separate ranges
firstToken := allTokens[0]
_, ownsFirstToken := ownedTokens[firstToken]

if ownsFirstToken {
// we'll start by looking for the beginning of the range that ends with math.MaxUint32
rangeEnd = math.MaxUint32
}

// walk the ring backwards, alternating looking for ends and starts of ranges
for i := len(allTokens) - 1; i > 0; i-- {
token := allTokens[i]
_, owned := ownedTokens[token]

if rangeEnd == 0 {
// we're looking for the end of the next range
if owned {
rangeEnd = token - 1
}
} else {
// we have a range end, and are looking for the start of the range
if !owned {
ranges = append(ranges, rangeEnd, token)
rangeEnd = 0
}
}
}

// finally look at the first token again
// - if we have a range end, check if we claimed token 0
// - if we don't, we have our start
// - if we do, the start is 0
// - if we don't have a range end, check if we claimed token 0
// - if we don't, do nothing
// - if we do, add the range of [0, token-1]
// - BUT, if the token itself is 0, do nothing, because we don't own the tokens themselves (we should be covered by the already added range that ends with MaxUint32)

if rangeEnd == 0 {
if ownsFirstToken && firstToken != 0 {
ranges = append(ranges, firstToken-1, 0)
}
} else {
if ownsFirstToken {
ranges = append(ranges, rangeEnd, 0)
} else {
ranges = append(ranges, rangeEnd, firstToken)
}
}

// Ensure returned ranges are sorted.
slices.Sort(ranges)

return ranges, nil
}

// runs a single round of compaction for all relevant tenants and tables
Expand Down Expand Up @@ -266,25 +367,28 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
for tenants.Next() && tenants.Err() == nil && ctx.Err() == nil {
c.metrics.tenantsDiscovered.Inc()
tenant := tenants.At()
ownershipRange, owns, err := c.ownsTenant(tenant)
ownershipRanges, owns, err := c.ownsTenant(tenant)
if err != nil {
return errors.Wrap(err, "checking tenant ownership")
}
level.Debug(c.logger).Log("msg", "enqueueing work for tenant", "tenant", tenant, "table", table, "ownership", ownershipRange.String(), "owns", owns)
level.Debug(c.logger).Log("msg", "enqueueing work for tenant", "tenant", tenant, "table", table, "ranges", len(ownershipRanges), "owns", owns)
if !owns {
c.metrics.tenantsSkipped.Inc()
continue
}
c.metrics.tenantsOwned.Inc()

select {
case ch <- tenantTable{
tenant: tenant,
table: table,
ownershipRange: ownershipRange,
}:
case <-ctx.Done():
return ctx.Err()
for _, ownershipRange := range ownershipRanges {

select {
case ch <- tenantTable{
tenant: tenant,
table: table,
ownershipRange: ownershipRange,
}:
case <-ctx.Done():
return ctx.Err()
}
}
}

Expand Down
81 changes: 74 additions & 7 deletions pkg/bloomcompactor/bloomcompactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"
"time"

"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -113,7 +114,7 @@ func TestCompactor_ownsTenant(t *testing.T) {
require.NoError(t, err)
if ownsTenant {
compactorOwnsTenant++
compactorOwnershipRange = append(compactorOwnershipRange, ownershipRange)
compactorOwnershipRange = append(compactorOwnershipRange, ownershipRange...)
}
}
require.Equal(t, tc.expectedCompactorsOwningTenant, compactorOwnsTenant)
Expand All @@ -135,12 +136,6 @@ func TestCompactor_ownsTenant(t *testing.T) {
coveredKeySpace.Max = boundsA.Max
}

// Assert that the fingerprint key-space is evenly distributed across the compactors
// We do some adjustments if the key-space is not evenly distributable, so we use a delta of 10
// to account for that and check that the key-space is reasonably evenly distributed.
fpPerTenant := math.MaxUint64 / uint64(tc.expectedCompactorsOwningTenant)
boundsLen := uint64(boundsA.Max - boundsA.Min)
require.InDelta(t, fpPerTenant, boundsLen, 10)
}
// Assert that the fingerprint key-space is complete
require.True(t, coveredKeySpace.Equal(v1.NewBounds(0, math.MaxUint64)))
Expand Down Expand Up @@ -195,3 +190,75 @@ func (m mockLimits) BloomFalsePositiveRate(_ string) float64 {
func (m mockLimits) BloomCompactorMaxBlockSize(_ string) int {
panic("implement me")
}

func TestTokenRangesForInstance(t *testing.T) {
desc := func(id int, tokens ...uint32) ring.InstanceDesc {
return ring.InstanceDesc{Id: fmt.Sprintf("%d", id), Tokens: tokens}
}

tests := map[string]struct {
input []ring.InstanceDesc
exp map[string]ring.TokenRanges
err bool
}{
"no nodes": {
input: []ring.InstanceDesc{},
exp: map[string]ring.TokenRanges{
"0": {0, math.MaxUint32}, // have to put one in here to trigger test
},
err: true,
},
"one node": {
input: []ring.InstanceDesc{
desc(0, 0, 100),
},
exp: map[string]ring.TokenRanges{
"0": {0, math.MaxUint32},
},
},
"two nodes": {
input: []ring.InstanceDesc{
desc(0, 25, 75),
desc(1, 10, 50, 100),
},
exp: map[string]ring.TokenRanges{
"0": {10, 24, 50, 74},
"1": {0, 9, 25, 49, 75, math.MaxUint32},
},
},
"consecutive tokens": {
input: []ring.InstanceDesc{
desc(0, 99),
desc(1, 100),
},
exp: map[string]ring.TokenRanges{
"0": {0, 98, 100, math.MaxUint32},
"1": {99, 99},
},
},
"extremes": {
input: []ring.InstanceDesc{
desc(0, 0),
desc(1, math.MaxUint32),
},
exp: map[string]ring.TokenRanges{
"0": {math.MaxUint32, math.MaxUint32},
"1": {0, math.MaxUint32 - 1},
},
},
}

for desc, test := range tests {
t.Run(desc, func(t *testing.T) {
for id := range test.exp {
ranges, err := tokenRangesForInstance(id, test.input)
if test.err {
require.Error(t, err)
continue
}
require.NoError(t, err)
require.Equal(t, test.exp[id], ranges)
}
})
}
}
49 changes: 10 additions & 39 deletions pkg/bloomutils/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
package bloomutils

import (
"errors"
"fmt"
"math"
"sort"

"github.com/grafana/dskit/ring"
"github.com/prometheus/common/model"
"golang.org/x/exp/constraints"
"golang.org/x/exp/slices"

v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)
Expand Down Expand Up @@ -68,44 +67,16 @@ func (i InstancesWithTokenRange) Contains(token uint32) bool {
return false
}

// KeyRangeForInstance calculates the token range for a specific instance
// with given id based on the first token in the ring.
// This assumes that each instance in the ring is configured with only a single
// token.
func KeyRangeForInstance[T constraints.Unsigned](id string, instances []ring.InstanceDesc, keyspace Range[T]) (Range[T], error) {

// Sort instances -- they may not be sorted
// because they're usually accessed by looking up the tokens (which are sorted)
sort.Slice(instances, func(i, j int) bool {
return instances[i].Tokens[0] < instances[j].Tokens[0]
})

idx := slices.IndexFunc(instances, func(inst ring.InstanceDesc) bool {
return inst.Id == id
})

// instance with Id == id not found
if idx == -1 {
return Range[T]{}, ring.ErrInstanceNotFound
// TODO(owen-d): use https://github.com/grafana/loki/pull/11975 after merge
func KeyspacesFromTokenRanges(tokenRanges ring.TokenRanges) []v1.FingerprintBounds {
keyspaces := make([]v1.FingerprintBounds, 0, len(tokenRanges)/2)
for i := 0; i < len(tokenRanges)-1; i += 2 {
keyspaces = append(keyspaces, v1.FingerprintBounds{
Min: model.Fingerprint(tokenRanges[i]) << 32,
Max: model.Fingerprint(tokenRanges[i+1])<<32 | model.Fingerprint(math.MaxUint32),
})
}

diff := keyspace.Max - keyspace.Min
i := T(idx)
n := T(len(instances))

if diff < n {
return Range[T]{}, errors.New("keyspace is smaller than amount of instances")
}

step := diff / n
min := step * i
max := step*i + step - 1
if i == n-1 {
// extend the last token tange to MaxUint32
max = (keyspace.Max - keyspace.Min)
}

return Range[T]{min, max}, nil
return keyspaces
}

// NewInstanceSortMergeIterator creates an iterator that yields instanceWithToken elements
Expand Down
Loading
Loading