From ea908b1160df0823e5fb66a9ce1bc623d3c63c2b Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Fri, 22 Nov 2024 18:12:43 -0800 Subject: [PATCH] [ES-1314123] race condition for memorize tsdb client Signed-off-by: Yi Jin --- docs/sharding.md | 2 +- pkg/receive/multitsdb.go | 70 ++++++++--------------------------- pkg/receive/multitsdb_test.go | 42 +++++++++++++++++++++ 3 files changed, 58 insertions(+), 56 deletions(-) diff --git a/docs/sharding.md b/docs/sharding.md index 9cfa0fbf8a..f943ec071b 100644 --- a/docs/sharding.md +++ b/docs/sharding.md @@ -18,7 +18,7 @@ Queries against store gateway which are touching large number of blocks (no matt # Relabelling -Similar to [promtail](https://grafana.com/docs/loki/latest/send-data/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax. +Similar to [promtail](https://grafana.com/docs/loki/latest/clients/promtail/configuration/#relabel_configs) this config follows native [Prometheus relabel-config](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config) syntax. Currently, thanos only supports the following relabel actions: diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index d40cf43c06..5fd8352b4b 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -62,12 +62,6 @@ type MultiTSDB struct { hashFunc metadata.HashFunc hashringConfigs []HashringConfig - tsdbClients []store.Client - tsdbClientsNeedUpdate bool - - exemplarClients map[string]*exemplars.TSDB - exemplarClientsNeedUpdate bool - metricNameFilterEnabled bool } @@ -100,19 +94,17 @@ func NewMultiTSDB( } mt := &MultiTSDB{ - dataDir: dataDir, - logger: log.With(l, "component", "multi-tsdb"), - reg: reg, - tsdbOpts: tsdbOpts, - mtx: &sync.RWMutex{}, - tenants: map[string]*tenant{}, - labels: labels, - tsdbClientsNeedUpdate: true, - exemplarClientsNeedUpdate: true, - tenantLabelName: tenantLabelName, - bucket: bucket, - allowOutOfOrderUpload: allowOutOfOrderUpload, - hashFunc: hashFunc, + dataDir: dataDir, + logger: log.With(l, "component", "multi-tsdb"), + reg: reg, + tsdbOpts: tsdbOpts, + mtx: &sync.RWMutex{}, + tenants: map[string]*tenant{}, + labels: labels, + tenantLabelName: tenantLabelName, + bucket: bucket, + allowOutOfOrderUpload: allowOutOfOrderUpload, + hashFunc: hashFunc, } for _, option := range options { @@ -435,8 +427,6 @@ func (t *MultiTSDB) Prune(ctx context.Context) error { level.Info(t.logger).Log("msg", "Pruned tenant", "tenant", tenantID) delete(t.tenants, tenantID) - t.tsdbClientsNeedUpdate = true - t.exemplarClientsNeedUpdate = true } return merr.Err() @@ -598,17 +588,7 @@ func (t *MultiTSDB) RemoveLockFilesIfAny() error { func (t *MultiTSDB) TSDBLocalClients() []store.Client { t.mtx.RLock() - if !t.tsdbClientsNeedUpdate { - t.mtx.RUnlock() - return t.tsdbClients - } - - t.mtx.RUnlock() - t.mtx.Lock() - defer t.mtx.Unlock() - if !t.tsdbClientsNeedUpdate { - return t.tsdbClients - } + defer t.mtx.RUnlock() res := make([]store.Client, 0, len(t.tenants)) for _, tenant := range t.tenants { @@ -618,25 +598,12 @@ func (t *MultiTSDB) TSDBLocalClients() []store.Client { } } - t.tsdbClientsNeedUpdate = false - t.tsdbClients = res - - return t.tsdbClients + return res } func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB { t.mtx.RLock() - if !t.exemplarClientsNeedUpdate { - t.mtx.RUnlock() - return t.exemplarClients - } - t.mtx.RUnlock() - t.mtx.Lock() - defer t.mtx.Unlock() - - if !t.exemplarClientsNeedUpdate { - return t.exemplarClients - } + defer t.mtx.RUnlock() res := make(map[string]*exemplars.TSDB, len(t.tenants)) for k, tenant := range t.tenants { @@ -645,10 +612,7 @@ func (t *MultiTSDB) TSDBExemplars() map[string]*exemplars.TSDB { res[k] = e } } - - t.exemplarClientsNeedUpdate = false - t.exemplarClients = res - return t.exemplarClients + return res } func (t *MultiTSDB) TenantStats(limit int, statsByLabelName string, tenantIDs ...string) []status.TenantStats { @@ -725,8 +689,6 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant if err != nil { t.mtx.Lock() delete(t.tenants, tenantID) - t.tsdbClientsNeedUpdate = true - t.exemplarClientsNeedUpdate = true t.mtx.Unlock() return err } @@ -779,8 +741,6 @@ func (t *MultiTSDB) getOrLoadTenant(tenantID string, blockingStart bool) (*tenan tenant = newTenant() t.tenants[tenantID] = tenant - t.tsdbClientsNeedUpdate = true - t.exemplarClientsNeedUpdate = true t.mtx.Unlock() logger := log.With(t.logger, "tenant", tenantID) diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index 4bee9c0514..a846dfbbdd 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -5,6 +5,7 @@ package receive import ( "context" + "fmt" "io" "math" "os" @@ -541,6 +542,47 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) { testutil.Equals(t, 1, len(m.TSDBLocalClients())) } +func TestMultiTSDBAddNewTenant(t *testing.T) { + t.Parallel() + const iterations = 10 + // This test detects race conditions, so we run it multiple times to increase the chance of catching the issue. + for i := 0; i < iterations; i++ { + t.Run(fmt.Sprintf("iteration-%d", i), func(t *testing.T) { + dir := t.TempDir() + m := NewMultiTSDB(dir, log.NewNopLogger(), prometheus.NewRegistry(), + &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + }, + labels.FromStrings("replica", "test"), + "tenant_id", + objstore.NewInMemBucket(), + false, + metadata.NoneFunc, + ) + defer func() { testutil.Ok(t, m.Close()) }() + + concurrency := 50 + var wg sync.WaitGroup + for i := 0; i < concurrency; i++ { + wg.Add(1) + // simulate remote write with new tenant concurrently + go func(i int) { + defer wg.Done() + testutil.Ok(t, appendSample(m, fmt.Sprintf("tenant-%d", i), time.UnixMilli(int64(10)))) + }(i) + // simulate read request concurrently + go func() { + m.TSDBLocalClients() + }() + } + wg.Wait() + testutil.Equals(t, concurrency, len(m.TSDBLocalClients())) + }) + } +} + func TestAlignedHeadFlush(t *testing.T) { t.Parallel()