Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ES-1314123] race condition for memorize tsdb client #108

Merged
merged 1 commit into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Queries against store gateway which are touching large number of blocks (no matt

# Relabelling

Similar to [promtail](https://grafana.com/docs/loki/latest/send-data/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax.
Similar to [promtail](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax.

Currently, thanos only supports the following relabel actions:

Expand Down
70 changes: 15 additions & 55 deletions pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ type MultiTSDB struct {
hashFunc metadata.HashFunc
hashringConfigs []HashringConfig

tsdbClients []store.Client
tsdbClientsNeedUpdate bool

exemplarClients map[string]*exemplars.TSDB
exemplarClientsNeedUpdate bool

metricNameFilterEnabled bool
}

Expand Down Expand Up @@ -100,19 +94,17 @@ func NewMultiTSDB(
}

mt := &MultiTSDB{
dataDir: dataDir,
logger: log.With(l, "component", "multi-tsdb"),
reg: reg,
tsdbOpts: tsdbOpts,
mtx: &sync.RWMutex{},
tenants: map[string]*tenant{},
labels: labels,
tsdbClientsNeedUpdate: true,
exemplarClientsNeedUpdate: true,
tenantLabelName: tenantLabelName,
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
dataDir: dataDir,
logger: log.With(l, "component", "multi-tsdb"),
reg: reg,
tsdbOpts: tsdbOpts,
mtx: &sync.RWMutex{},
tenants: map[string]*tenant{},
labels: labels,
tenantLabelName: tenantLabelName,
bucket: bucket,
allowOutOfOrderUpload: allowOutOfOrderUpload,
hashFunc: hashFunc,
}

for _, option := range options {
Expand Down Expand Up @@ -435,8 +427,6 @@ func (t *MultiTSDB) Prune(ctx context.Context) error {

level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID)
delete(t.tenants, tenantID)
t.tsdbClientsNeedUpdate = true
t.exemplarClientsNeedUpdate = true
}

return merr.Err()
Expand Down Expand Up @@ -598,17 +588,7 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error {

func (t *MultiTSDB) TSDBLocalClients() []store.Client {
t.mtx.RLock()
if !t.tsdbClientsNeedUpdate {
t.mtx.RUnlock()
return t.tsdbClients
}

t.mtx.RUnlock()
t.mtx.Lock()
defer t.mtx.Unlock()
if !t.tsdbClientsNeedUpdate {
return t.tsdbClients
}
defer t.mtx.RUnlock()

res := make([]store.Client, 0, len(t.tenants))
for _, tenant := range t.tenants {
Expand All @@ -618,25 +598,12 @@ func (t *MultiTSDB) TSDBLocalClients() []store.Client {
}
}

t.tsdbClientsNeedUpdate = false
t.tsdbClients = res

return t.tsdbClients
return res
}

func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB {
t.mtx.RLock()
if !t.exemplarClientsNeedUpdate {
t.mtx.RUnlock()
return t.exemplarClients
}
t.mtx.RUnlock()
t.mtx.Lock()
defer t.mtx.Unlock()

if !t.exemplarClientsNeedUpdate {
return t.exemplarClients
}
defer t.mtx.RUnlock()

res := make(map[string]*exemplars.TSDB, len(t.tenants))
for k, tenant := range t.tenants {
Expand All @@ -645,10 +612,7 @@ func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB {
res[k] = e
}
}

t.exemplarClientsNeedUpdate = false
t.exemplarClients = res
return t.exemplarClients
return res
}

func (t *MultiTSDB) TenantStats(limit int, statsByLabelName string, tenantIDs ...string) []status.TenantStats {
Expand Down Expand Up @@ -725,8 +689,6 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant
if err != nil {
t.mtx.Lock()
delete(t.tenants, tenantID)
t.tsdbClientsNeedUpdate = true
t.exemplarClientsNeedUpdate = true
t.mtx.Unlock()
return err
}
Expand Down Expand Up @@ -779,8 +741,6 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan

tenant = newTenant()
t.tenants[tenantID] = tenant
t.tsdbClientsNeedUpdate = true
t.exemplarClientsNeedUpdate = true
t.mtx.Unlock()

logger := log.With(t.logger, "tenant", tenantID)
Expand Down
42 changes: 42 additions & 0 deletions pkg/receive/multitsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package receive

import (
"context"
"fmt"
"io"
"math"
"os"
Expand Down Expand Up @@ -541,6 +542,47 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) {
testutil.Equals(t, 1, len(m.TSDBLocalClients()))
}

func TestMultiTSDBAddNewTenant(t *testing.T) {
t.Parallel()
const iterations = 10
// This test detects race conditions, so we run it multiple times to increase the chance of catching the issue.
for i := 0; i < iterations; i++ {
t.Run(fmt.Sprintf("iteration-%d", i), func(t *testing.T) {
dir := t.TempDir()
m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(),
&tsdb.Options{
MinBlockDuration: (2 * time.Hour).Milliseconds(),
MaxBlockDuration: (2 * time.Hour).Milliseconds(),
RetentionDuration: (6 * time.Hour).Milliseconds(),
},
labels.FromStrings("replica", "test"),
"tenant_id",
objstore.NewInMemBucket(),
false,
metadata.NoneFunc,
)
defer func() { testutil.Ok(t, m.Close()) }()

concurrency := 50
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
// simulate remote write with new tenant concurrently
go func(i int) {
defer wg.Done()
testutil.Ok(t, appendSample(m, fmt.Sprintf("tenant-%d", i), time.UnixMilli(int64(10))))
}(i)
// simulate read request concurrently
go func() {
m.TSDBLocalClients()
}()
}
wg.Wait()
testutil.Equals(t, concurrency, len(m.TSDBLocalClients()))
})
}
}

func TestAlignedHeadFlush(t *testing.T) {
t.Parallel()

Expand Down
Loading