From e398fa84bb3b17d59196e566b0b759687923b848 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Thu, 20 Jun 2024 10:49:47 +0300 Subject: [PATCH 1/6] added additional logs and fixed minor bugs Signed-off-by: Vladyslav Diachenko --- pkg/ingester/ingester.go | 18 +++++++++++- pkg/ingester/instance.go | 34 +++++++++++++++++------ pkg/ingester/owned_streams.go | 14 +++++++++- pkg/ingester/owned_streams_test.go | 3 +- pkg/ingester/recalculate_owned_streams.go | 32 ++++++++++++++++----- 5 files changed, 82 insertions(+), 19 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 1f7b2483f0fb1..7223d923ffe37 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -914,7 +914,23 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / inst, ok = i.instances[instanceID] if !ok { var err error - inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter, i.pipelineWrapper, i.extractorWrapper, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker) + inst, err = newInstanceWithLogger( + &i.cfg, + i.periodicConfigs, + instanceID, + i.limiter, + i.tenantConfigs, + i.wal, + i.metrics, + i.flushOnShutdownSwitch, + i.chunkFilter, + i.pipelineWrapper, + i.extractorWrapper, + i.streamRateCalculator, + i.writeLogManager, + i.customStreamsTracker, + i.logger, + ) if err != nil { return nil, err } diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 1d30e7e23ece4..318f9e322dbef 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/loki/v3/pkg/util/httpreq" + logg "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/ring" @@ -132,7 +133,7 @@ type instance struct { customStreamsTracker push.UsageTracker } -func newInstance( +func newInstanceWithLogger( cfg *Config, periodConfigs []config.PeriodConfig, instanceID string, @@ -147,13 +148,14 @@ func newInstance( streamRateCalculator *StreamRateCalculator, writeFailures *writefailures.Manager, customStreamsTracker push.UsageTracker, + logger logg.Logger, ) (*instance, error) { invertedIndex, err := index.NewMultiInvertedIndex(periodConfigs, uint32(cfg.IndexShards)) if err != nil { return nil, err } streams := newStreamsMap() - ownedStreamsSvc := newOwnedStreamService(instanceID, limiter) + ownedStreamsSvc := newOwnedStreamService(instanceID, limiter, logger) c := config.SchemaConfig{Configs: periodConfigs} i := &instance{ cfg: cfg, @@ -191,6 +193,25 @@ func newInstance( return i, err } +func newInstance( + cfg *Config, + periodConfigs []config.PeriodConfig, + instanceID string, + limiter *Limiter, + configs *runtime.TenantConfigs, + wal WAL, + metrics *ingesterMetrics, + flushOnShutdownSwitch *OnceSwitch, + chunkFilter chunk.RequestChunkFilterer, + pipelineWrapper log.PipelineWrapper, + extractorWrapper log.SampleExtractorWrapper, + streamRateCalculator *StreamRateCalculator, + writeFailures *writefailures.Manager, + customStreamsTracker push.UsageTracker, +) (*instance, error) { + return newInstanceWithLogger(cfg, periodConfigs, instanceID, limiter, configs, wal, metrics, flushOnShutdownSwitch, chunkFilter, pipelineWrapper, extractorWrapper, streamRateCalculator, writeFailures, customStreamsTracker, util_log.Logger) +} + // consumeChunk manually adds a chunk that was received during ingester chunk // transfer. func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *logproto.Chunk) error { @@ -1177,17 +1198,12 @@ func minTs(stream *logproto.Stream) model.Time { } // For each stream, we check if the stream is owned by the ingester or not and increment/decrement the owned stream count. -func (i *instance) updateOwnedStreams(ownedTokenRange ring.TokenRanges) error { - var err error +func (i *instance) updateOwnedStreams(ownedTokenRange ring.TokenRanges) { i.streams.WithLock(func() { i.ownedStreamsSvc.resetStreamCounts() - err = i.streams.ForEach(func(s *stream) (bool, error) { + _ = i.streams.ForEach(func(s *stream) (bool, error) { i.ownedStreamsSvc.trackStreamOwnership(s.fp, ownedTokenRange.IncludesKey(uint32(s.fp))) return true, nil }) }) - if err != nil { - return fmt.Errorf("error checking streams ownership: %w", err) - } - return nil } diff --git a/pkg/ingester/owned_streams.go b/pkg/ingester/owned_streams.go index 55f7eb480482b..9cfbadabe4c73 100644 --- a/pkg/ingester/owned_streams.go +++ b/pkg/ingester/owned_streams.go @@ -3,6 +3,8 @@ package ingester import ( "sync" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" @@ -24,14 +26,16 @@ type ownedStreamService struct { ownedStreamCount int lock sync.RWMutex notOwnedStreams map[model.Fingerprint]any + logger log.Logger } -func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService { +func newOwnedStreamService(tenantID string, limiter *Limiter, logger log.Logger) *ownedStreamService { svc := &ownedStreamService{ tenantID: tenantID, limiter: limiter, fixedLimit: atomic.NewInt32(0), notOwnedStreams: make(map[model.Fingerprint]any), + logger: logger, } svc.updateFixedLimit() @@ -45,6 +49,7 @@ func (s *ownedStreamService) getOwnedStreamCount() int { } func (s *ownedStreamService) updateFixedLimit() { + level.Debug(s.logger).Log("msg", "updating fixed limit", "tenant", s.tenantID) limit, _, _, _ := s.limiter.GetStreamCountLimit(s.tenantID) s.fixedLimit.Store(int32(limit)) } @@ -60,6 +65,7 @@ func (s *ownedStreamService) trackStreamOwnership(fp model.Fingerprint, owned bo s.ownedStreamCount++ return } + level.Debug(s.logger).Log("msg", "stream is marked as not owned", "fingerprint", fp) notOwnedStreamsMetric.WithLabelValues(s.tenantID).Inc() s.notOwnedStreams[fp] = nil } @@ -69,16 +75,20 @@ func (s *ownedStreamService) trackRemovedStream(fp model.Fingerprint) { defer s.lock.Unlock() if _, notOwned := s.notOwnedStreams[fp]; notOwned { + level.Debug(s.logger).Log("msg", "removing not owned stream", "fingerprint", fp) notOwnedStreamsMetric.WithLabelValues(s.tenantID).Dec() delete(s.notOwnedStreams, fp) return } + level.Debug(s.logger).Log("msg", "removing owned stream", "fingerprint", fp) s.ownedStreamCount-- } func (s *ownedStreamService) resetStreamCounts() { s.lock.Lock() defer s.lock.Unlock() + //todo remove or pass a logger from the ingester + level.Debug(s.logger).Log("msg", "resetting stream counts") s.ownedStreamCount = 0 notOwnedStreamsMetric.WithLabelValues(s.tenantID).Set(0) s.notOwnedStreams = make(map[model.Fingerprint]any) @@ -89,5 +99,7 @@ func (s *ownedStreamService) isStreamNotOwned(fp model.Fingerprint) bool { defer s.lock.RUnlock() _, notOwned := s.notOwnedStreams[fp] + // todo remove + level.Debug(s.logger).Log("msg", "checking stream ownership", "fingerprint", fp, "notOwned", notOwned) return notOwned } diff --git a/pkg/ingester/owned_streams_test.go b/pkg/ingester/owned_streams_test.go index 7f114922fa447..f1a9600b6ca7c 100644 --- a/pkg/ingester/owned_streams_test.go +++ b/pkg/ingester/owned_streams_test.go @@ -1,6 +1,7 @@ package ingester import ( + "github.com/go-kit/log" "sync" "testing" @@ -19,7 +20,7 @@ func Test_OwnedStreamService(t *testing.T) { ring := &ringCountMock{count: 30} limiter := NewLimiter(limits, NilMetrics, ring, 3) - service := newOwnedStreamService("test", limiter) + service := newOwnedStreamService("test", limiter, log.NewNopLogger()) require.Equal(t, 0, service.getOwnedStreamCount()) require.Equal(t, 10, service.getFixedLimit(), "fixed limit must be initialised during the instantiation") diff --git a/pkg/ingester/recalculate_owned_streams.go b/pkg/ingester/recalculate_owned_streams.go index 59f8fe6b9269d..748089c35525b 100644 --- a/pkg/ingester/recalculate_owned_streams.go +++ b/pkg/ingester/recalculate_owned_streams.go @@ -3,6 +3,7 @@ package ingester import ( "context" "errors" + "fmt" "time" "github.com/go-kit/log" @@ -11,8 +12,6 @@ import ( "github.com/grafana/dskit/services" ) -var ownedStreamRingOp = ring.NewOp([]ring.InstanceState{ring.PENDING, ring.JOINING, ring.ACTIVE, ring.LEAVING}, nil) - type recalculateOwnedStreams struct { services.Service @@ -42,17 +41,25 @@ func (s *recalculateOwnedStreams) iteration(_ context.Context) error { } func (s *recalculateOwnedStreams) recalculate() { + level.Info(s.logger).Log("msg", "starting recalculate owned streams job") + defer func() { + s.updateFixedLimitForAll() + level.Info(s.logger).Log("msg", "completed recalculate owned streams job") + }() ringChanged, err := s.checkRingForChanges() if err != nil { level.Error(s.logger).Log("msg", "failed to check ring for changes", "err", err) return } if !ringChanged { + level.Debug(s.logger).Log("msg", "ring is not changed, skipping the job") return } + level.Info(s.logger).Log("msg", "detected ring changes, re-evaluating streams ownership") ownedTokenRange, err := s.getTokenRangesForIngester() if err != nil { level.Error(s.logger).Log("msg", "failed to get token ranges for ingester", "err", err) + s.resetRingState() return } @@ -60,20 +67,30 @@ func (s *recalculateOwnedStreams) recalculate() { if !instance.limiter.limits.UseOwnedStreamCount(instance.instanceID) { continue } - err = instance.updateOwnedStreams(ownedTokenRange) - if err != nil { - level.Error(s.logger).Log("msg", "failed to update owned streams", "err", err) - } + + instance.updateOwnedStreams(ownedTokenRange) + } +} + +func (s *recalculateOwnedStreams) updateFixedLimitForAll() { + for _, instance := range s.instancesSupplier() { instance.ownedStreamsSvc.updateFixedLimit() } } +func (s *recalculateOwnedStreams) resetRingState() { + s.previousRing = ring.ReplicationSet{} +} + func (s *recalculateOwnedStreams) checkRingForChanges() (bool, error) { - rs, err := s.ingestersRing.GetAllHealthy(ownedStreamRingOp) + rs, err := s.ingestersRing.GetAllHealthy(ring.WriteNoExtend) if err != nil { return false, err } + //todo remove + level.Debug(s.logger).Log("previous ring", fmt.Sprintf("%+v", s.previousRing)) + level.Debug(s.logger).Log("new ring", fmt.Sprintf("%+v", rs)) ringChanged := ring.HasReplicationSetChangedWithoutStateOrAddr(s.previousRing, rs) s.previousRing = rs return ringChanged, nil @@ -83,6 +100,7 @@ func (s *recalculateOwnedStreams) getTokenRangesForIngester() (ring.TokenRanges, ranges, err := s.ingestersRing.GetTokenRangesForInstance(s.ingesterID) if err != nil { if errors.Is(err, ring.ErrInstanceNotFound) { + level.Debug(s.logger).Log("msg", "failed to get token ranges for ingester", "ingesterID", s.ingesterID, "err", err) return nil, nil } return nil, err From 0ba5e5950f271521471e80f91d4948265b165049 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Fri, 21 Jun 2024 16:32:37 +0300 Subject: [PATCH 2/6] changed implementation to evaluate streams' ownership using the same method which is used by distributor to select the ingesters. Signed-off-by: Vladyslav Diachenko --- pkg/ingester/instance.go | 37 +++- pkg/ingester/owned_streams.go | 6 +- pkg/ingester/recalculate_owned_streams.go | 25 +-- .../recalculate_owned_streams_test.go | 176 +++++++++--------- 4 files changed, 124 insertions(+), 120 deletions(-) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 318f9e322dbef..5a2453f98d181 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -10,14 +10,11 @@ import ( "syscall" "time" - "github.com/grafana/loki/v3/pkg/logqlmodel/metadata" - - "github.com/grafana/loki/v3/pkg/util/httpreq" - logg "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/tenant" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -28,8 +25,6 @@ import ( tsdb_record "github.com/prometheus/prometheus/tsdb/record" "go.uber.org/atomic" - "github.com/grafana/dskit/tenant" - "github.com/grafana/loki/v3/pkg/analytics" "github.com/grafana/loki/v3/pkg/chunkenc" "github.com/grafana/loki/v3/pkg/distributor/writefailures" @@ -41,6 +36,7 @@ import ( "github.com/grafana/loki/v3/pkg/logql" "github.com/grafana/loki/v3/pkg/logql/log" "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/logqlmodel/metadata" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/runtime" "github.com/grafana/loki/v3/pkg/storage/chunk" @@ -49,8 +45,10 @@ import ( "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/util/deletion" + "github.com/grafana/loki/v3/pkg/util/httpreq" util_log "github.com/grafana/loki/v3/pkg/util/log" mathutil "github.com/grafana/loki/v3/pkg/util/math" + lokiring "github.com/grafana/loki/v3/pkg/util/ring" server_util "github.com/grafana/loki/v3/pkg/util/server" "github.com/grafana/loki/v3/pkg/validation" ) @@ -1197,13 +1195,34 @@ func minTs(stream *logproto.Stream) model.Time { return model.TimeFromUnixNano(streamMinTs) } +var streamTokenGenerator = lokiring.TokenFor + // For each stream, we check if the stream is owned by the ingester or not and increment/decrement the owned stream count. -func (i *instance) updateOwnedStreams(ownedTokenRange ring.TokenRanges) { +func (i *instance) updateOwnedStreams(ingesterRing ring.ReadRing, ingesterID string) error { + var descsBuf = make([]ring.InstanceDesc, ingesterRing.ReplicationFactor()+1) + var hostsBuf = make([]string, ingesterRing.ReplicationFactor()+1) + var zoneBuf = make([]string, ingesterRing.ZonesCount()+1) + var err error i.streams.WithLock(func() { i.ownedStreamsSvc.resetStreamCounts() - _ = i.streams.ForEach(func(s *stream) (bool, error) { - i.ownedStreamsSvc.trackStreamOwnership(s.fp, ownedTokenRange.IncludesKey(uint32(s.fp))) + err = i.streams.ForEach(func(s *stream) (bool, error) { + replicationSet, err := ingesterRing.Get(streamTokenGenerator(i.instanceID, s.labelsString), ring.WriteNoExtend, descsBuf, hostsBuf, zoneBuf) + if err != nil { + return false, fmt.Errorf("error getting replication set for stream %s: %v", s.labelsString, err) + } + ownedStream := i.isOwnedStream(replicationSet, ingesterID) + i.ownedStreamsSvc.trackStreamOwnership(s.fp, ownedStream) return true, nil }) }) + return err +} + +func (i *instance) isOwnedStream(replicationSet ring.ReplicationSet, ingesterID string) bool { + for _, instanceDesc := range replicationSet.Instances { + if instanceDesc.Id == ingesterID { + return true + } + } + return false } diff --git a/pkg/ingester/owned_streams.go b/pkg/ingester/owned_streams.go index 9cfbadabe4c73..375702d3c17d7 100644 --- a/pkg/ingester/owned_streams.go +++ b/pkg/ingester/owned_streams.go @@ -49,9 +49,9 @@ func (s *ownedStreamService) getOwnedStreamCount() int { } func (s *ownedStreamService) updateFixedLimit() { - level.Debug(s.logger).Log("msg", "updating fixed limit", "tenant", s.tenantID) - limit, _, _, _ := s.limiter.GetStreamCountLimit(s.tenantID) - s.fixedLimit.Store(int32(limit)) + newLimit, _, _, _ := s.limiter.GetStreamCountLimit(s.tenantID) + oldLimit := s.fixedLimit.Swap(int32(newLimit)) + level.Debug(s.logger).Log("msg", "updating fixed limit", "tenant", s.tenantID, "old_limit", oldLimit, "new_limit", newLimit) } func (s *ownedStreamService) getFixedLimit() int { diff --git a/pkg/ingester/recalculate_owned_streams.go b/pkg/ingester/recalculate_owned_streams.go index 748089c35525b..ad32eafeeefdb 100644 --- a/pkg/ingester/recalculate_owned_streams.go +++ b/pkg/ingester/recalculate_owned_streams.go @@ -2,7 +2,6 @@ package ingester import ( "context" - "errors" "fmt" "time" @@ -56,19 +55,16 @@ func (s *recalculateOwnedStreams) recalculate() { return } level.Info(s.logger).Log("msg", "detected ring changes, re-evaluating streams ownership") - ownedTokenRange, err := s.getTokenRangesForIngester() - if err != nil { - level.Error(s.logger).Log("msg", "failed to get token ranges for ingester", "err", err) - s.resetRingState() - return - } for _, instance := range s.instancesSupplier() { if !instance.limiter.limits.UseOwnedStreamCount(instance.instanceID) { continue } - instance.updateOwnedStreams(ownedTokenRange) + err := instance.updateOwnedStreams(s.ingestersRing, s.ingesterID) + if err != nil { + level.Error(s.logger).Log("msg", "failed to re-evaluate streams ownership", "tenant", instance.instanceID, "err", err) + } } } @@ -95,16 +91,3 @@ func (s *recalculateOwnedStreams) checkRingForChanges() (bool, error) { s.previousRing = rs return ringChanged, nil } - -func (s *recalculateOwnedStreams) getTokenRangesForIngester() (ring.TokenRanges, error) { - ranges, err := s.ingestersRing.GetTokenRangesForInstance(s.ingesterID) - if err != nil { - if errors.Is(err, ring.ErrInstanceNotFound) { - level.Debug(s.logger).Log("msg", "failed to get token ranges for ingester", "ingesterID", s.ingesterID, "err", err) - return nil, nil - } - return nil, err - } - - return ranges, nil -} diff --git a/pkg/ingester/recalculate_owned_streams_test.go b/pkg/ingester/recalculate_owned_streams_test.go index dda027ca2e443..589895df76f01 100644 --- a/pkg/ingester/recalculate_owned_streams_test.go +++ b/pkg/ingester/recalculate_owned_streams_test.go @@ -11,9 +11,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" - - "github.com/grafana/loki/v3/pkg/runtime" - "github.com/grafana/loki/v3/pkg/validation" ) func Test_recalculateOwnedStreams_newRecalculateOwnedStreams(t *testing.T) { @@ -31,90 +28,95 @@ func Test_recalculateOwnedStreams_newRecalculateOwnedStreams(t *testing.T) { }, 1*time.Second, 50*time.Millisecond, "expected at least two runs of the iteration") } -func Test_recalculateOwnedStreams_recalculate(t *testing.T) { - tests := map[string]struct { - featureEnabled bool - expectedOwnedStreamCount int - expectedNotOwnedStreamCount int - }{ - "expected streams ownership to be recalculated": { - featureEnabled: true, - expectedOwnedStreamCount: 4, - expectedNotOwnedStreamCount: 3, - }, - "expected streams ownership recalculation to be skipped": { - featureEnabled: false, - expectedOwnedStreamCount: 7, - }, - } - for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - mockRing := &readRingMock{ - replicationSet: ring.ReplicationSet{ - Instances: []ring.InstanceDesc{{Addr: "ingester-0", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{100, 200, 300}}}, - }, - tokenRangesByIngester: map[string]ring.TokenRanges{ - // this ingester owns token ranges [50, 100] and [200, 300] - "ingester-0": {50, 100, 200, 300}, - }, - } - - limits, err := validation.NewOverrides(validation.Limits{ - MaxGlobalStreamsPerUser: 100, - UseOwnedStreamCount: testData.featureEnabled, - }, nil) - require.NoError(t, err) - limiter := NewLimiter(limits, NilMetrics, mockRing, 1) - - tenant, err := newInstance( - defaultConfig(), - defaultPeriodConfigs, - "tenant-a", - limiter, - runtime.DefaultTenantConfigs(), - noopWAL{}, - NilMetrics, - nil, - nil, - nil, - nil, - NewStreamRateCalculator(), - nil, - nil, - ) - require.NoError(t, err) - require.Equal(t, 100, tenant.ownedStreamsSvc.getFixedLimit(), "MaxGlobalStreamsPerUser is 100 at this moment") - // not owned streams - createStream(t, tenant, 49) - createStream(t, tenant, 101) - createStream(t, tenant, 301) - - // owned streams - createStream(t, tenant, 50) - createStream(t, tenant, 60) - createStream(t, tenant, 100) - createStream(t, tenant, 250) - - require.Equal(t, 7, tenant.ownedStreamsSvc.ownedStreamCount) - require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, 0) - - mockTenantsSupplier := &mockTenantsSuplier{tenants: []*instance{tenant}} - - service := newRecalculateOwnedStreams(mockTenantsSupplier.get, "ingester-0", mockRing, 50*time.Millisecond, log.NewNopLogger()) - //change the limit to assert that fixed limit is updated after the recalculation - limits.DefaultLimits().MaxGlobalStreamsPerUser = 50 - - service.recalculate() - - if testData.featureEnabled { - require.Equal(t, 50, tenant.ownedStreamsSvc.getFixedLimit(), "fixed limit must be updated after recalculation") - } - require.Equal(t, testData.expectedOwnedStreamCount, tenant.ownedStreamsSvc.ownedStreamCount) - require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, testData.expectedNotOwnedStreamCount) - }) - } - -} +//func Test_recalculateOwnedStreams_recalculate(t *testing.T) { +// tests := map[string]struct { +// featureEnabled bool +// expectedOwnedStreamCount int +// expectedNotOwnedStreamCount int +// }{ +// "expected streams ownership to be recalculated": { +// featureEnabled: true, +// expectedOwnedStreamCount: 4, +// expectedNotOwnedStreamCount: 3, +// }, +// "expected streams ownership recalculation to be skipped": { +// featureEnabled: false, +// expectedOwnedStreamCount: 7, +// }, +// } +// for testName, testData := range tests { +// t.Run(testName, func(t *testing.T) { +// originalTokenGenerator := streamTokenGenerator +// t.Cleanup(func() { +// streamTokenGenerator = originalTokenGenerator +// }) +// streamTokenGenerator() +// mockRing := &readRingMock{ +// replicationSet: ring.ReplicationSet{ +// Instances: []ring.InstanceDesc{{Addr: "ingester-0", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{100, 200, 300}}}, +// }, +// tokenRangesByIngester: map[string]ring.TokenRanges{ +// // this ingester owns token ranges [50, 100] and [200, 300] +// "ingester-0": {50, 100, 200, 300}, +// }, +// } +// +// limits, err := validation.NewOverrides(validation.Limits{ +// MaxGlobalStreamsPerUser: 100, +// UseOwnedStreamCount: testData.featureEnabled, +// }, nil) +// require.NoError(t, err) +// limiter := NewLimiter(limits, NilMetrics, mockRing, 1) +// +// tenant, err := newInstance( +// defaultConfig(), +// defaultPeriodConfigs, +// "tenant-a", +// limiter, +// runtime.DefaultTenantConfigs(), +// noopWAL{}, +// NilMetrics, +// nil, +// nil, +// nil, +// nil, +// NewStreamRateCalculator(), +// nil, +// nil, +// ) +// require.NoError(t, err) +// require.Equal(t, 100, tenant.ownedStreamsSvc.getFixedLimit(), "MaxGlobalStreamsPerUser is 100 at this moment") +// // not owned streams +// createStream(t, tenant, 49) +// createStream(t, tenant, 101) +// createStream(t, tenant, 301) +// +// // owned streams +// createStream(t, tenant, 50) +// createStream(t, tenant, 60) +// createStream(t, tenant, 100) +// createStream(t, tenant, 250) +// +// require.Equal(t, 7, tenant.ownedStreamsSvc.ownedStreamCount) +// require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, 0) +// +// mockTenantsSupplier := &mockTenantsSuplier{tenants: []*instance{tenant}} +// +// service := newRecalculateOwnedStreams(mockTenantsSupplier.get, "ingester-0", mockRing, 50*time.Millisecond, log.NewNopLogger()) +// //change the limit to assert that fixed limit is updated after the recalculation +// limits.DefaultLimits().MaxGlobalStreamsPerUser = 50 +// +// service.recalculate() +// +// if testData.featureEnabled { +// require.Equal(t, 50, tenant.ownedStreamsSvc.getFixedLimit(), "fixed limit must be updated after recalculation") +// } +// require.Equal(t, testData.expectedOwnedStreamCount, tenant.ownedStreamsSvc.ownedStreamCount) +// require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, testData.expectedNotOwnedStreamCount) +// }) +// } +// +//} func Test_recalculateOwnedStreams_checkRingForChanges(t *testing.T) { mockRing := &readRingMock{ From 9b601f52fa8aba3174bbb31b57bbf0a8105be69a Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Tue, 25 Jun 2024 16:57:15 +0300 Subject: [PATCH 3/6] removed redundant logging Signed-off-by: Vladyslav Diachenko --- pkg/ingester/ingester.go | 18 +------------ pkg/ingester/instance.go | 25 ++---------------- pkg/ingester/owned_streams.go | 31 ++++++++--------------- pkg/ingester/recalculate_owned_streams.go | 10 ++++---- 4 files changed, 18 insertions(+), 66 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 70407b54ca721..af7f1fde288c9 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -919,23 +919,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / inst, ok = i.instances[instanceID] if !ok { var err error - inst, err = newInstanceWithLogger( - &i.cfg, - i.periodicConfigs, - instanceID, - i.limiter, - i.tenantConfigs, - i.wal, - i.metrics, - i.flushOnShutdownSwitch, - i.chunkFilter, - i.pipelineWrapper, - i.extractorWrapper, - i.streamRateCalculator, - i.writeLogManager, - i.customStreamsTracker, - i.logger, - ) + inst, err = newInstance(&i.cfg, i.periodicConfigs, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch, i.chunkFilter, i.pipelineWrapper, i.extractorWrapper, i.streamRateCalculator, i.writeLogManager, i.customStreamsTracker) if err != nil { return nil, err } diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 5a2453f98d181..03dcfcf18470b 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -10,7 +10,6 @@ import ( "syscall" "time" - logg "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/ring" @@ -131,7 +130,7 @@ type instance struct { customStreamsTracker push.UsageTracker } -func newInstanceWithLogger( +func newInstance( cfg *Config, periodConfigs []config.PeriodConfig, instanceID string, @@ -146,14 +145,13 @@ func newInstanceWithLogger( streamRateCalculator *StreamRateCalculator, writeFailures *writefailures.Manager, customStreamsTracker push.UsageTracker, - logger logg.Logger, ) (*instance, error) { invertedIndex, err := index.NewMultiInvertedIndex(periodConfigs, uint32(cfg.IndexShards)) if err != nil { return nil, err } streams := newStreamsMap() - ownedStreamsSvc := newOwnedStreamService(instanceID, limiter, logger) + ownedStreamsSvc := newOwnedStreamService(instanceID, limiter) c := config.SchemaConfig{Configs: periodConfigs} i := &instance{ cfg: cfg, @@ -191,25 +189,6 @@ func newInstanceWithLogger( return i, err } -func newInstance( - cfg *Config, - periodConfigs []config.PeriodConfig, - instanceID string, - limiter *Limiter, - configs *runtime.TenantConfigs, - wal WAL, - metrics *ingesterMetrics, - flushOnShutdownSwitch *OnceSwitch, - chunkFilter chunk.RequestChunkFilterer, - pipelineWrapper log.PipelineWrapper, - extractorWrapper log.SampleExtractorWrapper, - streamRateCalculator *StreamRateCalculator, - writeFailures *writefailures.Manager, - customStreamsTracker push.UsageTracker, -) (*instance, error) { - return newInstanceWithLogger(cfg, periodConfigs, instanceID, limiter, configs, wal, metrics, flushOnShutdownSwitch, chunkFilter, pipelineWrapper, extractorWrapper, streamRateCalculator, writeFailures, customStreamsTracker, util_log.Logger) -} - // consumeChunk manually adds a chunk that was received during ingester chunk // transfer. func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *logproto.Chunk) error { diff --git a/pkg/ingester/owned_streams.go b/pkg/ingester/owned_streams.go index 375702d3c17d7..3bb729815e718 100644 --- a/pkg/ingester/owned_streams.go +++ b/pkg/ingester/owned_streams.go @@ -3,8 +3,6 @@ package ingester import ( "sync" - "github.com/go-kit/log" - "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" @@ -13,11 +11,11 @@ import ( "github.com/grafana/loki/v3/pkg/util/constants" ) -var notOwnedStreamsMetric = promauto.NewGaugeVec(prometheus.GaugeOpts{ +var notOwnedStreamsMetric = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: constants.Loki, Name: "ingester_not_owned_streams", - Help: "The total number of not owned streams in memory per tenant.", -}, []string{"tenant"}) + Help: "The total number of not owned streams in memory.", +}) type ownedStreamService struct { tenantID string @@ -26,16 +24,14 @@ type ownedStreamService struct { ownedStreamCount int lock sync.RWMutex notOwnedStreams map[model.Fingerprint]any - logger log.Logger } -func newOwnedStreamService(tenantID string, limiter *Limiter, logger log.Logger) *ownedStreamService { +func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService { svc := &ownedStreamService{ tenantID: tenantID, limiter: limiter, fixedLimit: atomic.NewInt32(0), notOwnedStreams: make(map[model.Fingerprint]any), - logger: logger, } svc.updateFixedLimit() @@ -48,10 +44,10 @@ func (s *ownedStreamService) getOwnedStreamCount() int { return s.ownedStreamCount } -func (s *ownedStreamService) updateFixedLimit() { +func (s *ownedStreamService) updateFixedLimit() (old, new int32) { newLimit, _, _, _ := s.limiter.GetStreamCountLimit(s.tenantID) - oldLimit := s.fixedLimit.Swap(int32(newLimit)) - level.Debug(s.logger).Log("msg", "updating fixed limit", "tenant", s.tenantID, "old_limit", oldLimit, "new_limit", newLimit) + return s.fixedLimit.Swap(int32(newLimit)), int32(newLimit) + } func (s *ownedStreamService) getFixedLimit() int { @@ -65,8 +61,7 @@ func (s *ownedStreamService) trackStreamOwnership(fp model.Fingerprint, owned bo s.ownedStreamCount++ return } - level.Debug(s.logger).Log("msg", "stream is marked as not owned", "fingerprint", fp) - notOwnedStreamsMetric.WithLabelValues(s.tenantID).Inc() + notOwnedStreamsMetric.Inc() s.notOwnedStreams[fp] = nil } @@ -75,22 +70,18 @@ func (s *ownedStreamService) trackRemovedStream(fp model.Fingerprint) { defer s.lock.Unlock() if _, notOwned := s.notOwnedStreams[fp]; notOwned { - level.Debug(s.logger).Log("msg", "removing not owned stream", "fingerprint", fp) - notOwnedStreamsMetric.WithLabelValues(s.tenantID).Dec() + notOwnedStreamsMetric.Dec() delete(s.notOwnedStreams, fp) return } - level.Debug(s.logger).Log("msg", "removing owned stream", "fingerprint", fp) s.ownedStreamCount-- } func (s *ownedStreamService) resetStreamCounts() { s.lock.Lock() defer s.lock.Unlock() - //todo remove or pass a logger from the ingester - level.Debug(s.logger).Log("msg", "resetting stream counts") s.ownedStreamCount = 0 - notOwnedStreamsMetric.WithLabelValues(s.tenantID).Set(0) + notOwnedStreamsMetric.Sub(float64(len(s.notOwnedStreams))) s.notOwnedStreams = make(map[model.Fingerprint]any) } @@ -99,7 +90,5 @@ func (s *ownedStreamService) isStreamNotOwned(fp model.Fingerprint) bool { defer s.lock.RUnlock() _, notOwned := s.notOwnedStreams[fp] - // todo remove - level.Debug(s.logger).Log("msg", "checking stream ownership", "fingerprint", fp, "notOwned", notOwned) return notOwned } diff --git a/pkg/ingester/recalculate_owned_streams.go b/pkg/ingester/recalculate_owned_streams.go index ad32eafeeefdb..3d645862dee08 100644 --- a/pkg/ingester/recalculate_owned_streams.go +++ b/pkg/ingester/recalculate_owned_streams.go @@ -2,7 +2,6 @@ package ingester import ( "context" - "fmt" "time" "github.com/go-kit/log" @@ -61,6 +60,7 @@ func (s *recalculateOwnedStreams) recalculate() { continue } + level.Info(s.logger).Log("msg", "updating streams ownership", "tenant", instance.instanceID) err := instance.updateOwnedStreams(s.ingestersRing, s.ingesterID) if err != nil { level.Error(s.logger).Log("msg", "failed to re-evaluate streams ownership", "tenant", instance.instanceID, "err", err) @@ -70,7 +70,10 @@ func (s *recalculateOwnedStreams) recalculate() { func (s *recalculateOwnedStreams) updateFixedLimitForAll() { for _, instance := range s.instancesSupplier() { - instance.ownedStreamsSvc.updateFixedLimit() + oldLimit, newLimit := instance.ownedStreamsSvc.updateFixedLimit() + if oldLimit != newLimit { + level.Info(s.logger).Log("msg", "fixed limit has been updated", "tenant", instance.instanceID, "old", oldLimit, "new", newLimit) + } } } @@ -84,9 +87,6 @@ func (s *recalculateOwnedStreams) checkRingForChanges() (bool, error) { return false, err } - //todo remove - level.Debug(s.logger).Log("previous ring", fmt.Sprintf("%+v", s.previousRing)) - level.Debug(s.logger).Log("new ring", fmt.Sprintf("%+v", rs)) ringChanged := ring.HasReplicationSetChangedWithoutStateOrAddr(s.previousRing, rs) s.previousRing = rs return ringChanged, nil From a31c7b4ab9f2d989a3ae03e4de6bfecee7a0b644 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Tue, 25 Jun 2024 19:34:54 +0300 Subject: [PATCH 4/6] refactored code and fixed test Signed-off-by: Vladyslav Diachenko --- pkg/ingester/instance.go | 4 +- pkg/ingester/owned_streams_test.go | 3 +- .../recalculate_owned_streams_test.go | 221 ++++++++++-------- 3 files changed, 130 insertions(+), 98 deletions(-) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 03dcfcf18470b..5332622bd4b65 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -1174,8 +1174,6 @@ func minTs(stream *logproto.Stream) model.Time { return model.TimeFromUnixNano(streamMinTs) } -var streamTokenGenerator = lokiring.TokenFor - // For each stream, we check if the stream is owned by the ingester or not and increment/decrement the owned stream count. func (i *instance) updateOwnedStreams(ingesterRing ring.ReadRing, ingesterID string) error { var descsBuf = make([]ring.InstanceDesc, ingesterRing.ReplicationFactor()+1) @@ -1185,7 +1183,7 @@ func (i *instance) updateOwnedStreams(ingesterRing ring.ReadRing, ingesterID str i.streams.WithLock(func() { i.ownedStreamsSvc.resetStreamCounts() err = i.streams.ForEach(func(s *stream) (bool, error) { - replicationSet, err := ingesterRing.Get(streamTokenGenerator(i.instanceID, s.labelsString), ring.WriteNoExtend, descsBuf, hostsBuf, zoneBuf) + replicationSet, err := ingesterRing.Get(lokiring.TokenFor(i.instanceID, s.labelsString), ring.WriteNoExtend, descsBuf, hostsBuf, zoneBuf) if err != nil { return false, fmt.Errorf("error getting replication set for stream %s: %v", s.labelsString, err) } diff --git a/pkg/ingester/owned_streams_test.go b/pkg/ingester/owned_streams_test.go index f1a9600b6ca7c..7f114922fa447 100644 --- a/pkg/ingester/owned_streams_test.go +++ b/pkg/ingester/owned_streams_test.go @@ -1,7 +1,6 @@ package ingester import ( - "github.com/go-kit/log" "sync" "testing" @@ -20,7 +19,7 @@ func Test_OwnedStreamService(t *testing.T) { ring := &ringCountMock{count: 30} limiter := NewLimiter(limits, NilMetrics, ring, 3) - service := newOwnedStreamService("test", limiter, log.NewNopLogger()) + service := newOwnedStreamService("test", limiter) require.Equal(t, 0, service.getOwnedStreamCount()) require.Equal(t, 10, service.getFixedLimit(), "fixed limit must be initialised during the instantiation") diff --git a/pkg/ingester/recalculate_owned_streams_test.go b/pkg/ingester/recalculate_owned_streams_test.go index 589895df76f01..91b32baef820d 100644 --- a/pkg/ingester/recalculate_owned_streams_test.go +++ b/pkg/ingester/recalculate_owned_streams_test.go @@ -2,6 +2,7 @@ package ingester import ( "context" + "fmt" "strconv" "testing" "time" @@ -11,6 +12,10 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/runtime" + lokiring "github.com/grafana/loki/v3/pkg/util/ring" + "github.com/grafana/loki/v3/pkg/validation" ) func Test_recalculateOwnedStreams_newRecalculateOwnedStreams(t *testing.T) { @@ -28,95 +33,125 @@ func Test_recalculateOwnedStreams_newRecalculateOwnedStreams(t *testing.T) { }, 1*time.Second, 50*time.Millisecond, "expected at least two runs of the iteration") } -//func Test_recalculateOwnedStreams_recalculate(t *testing.T) { -// tests := map[string]struct { -// featureEnabled bool -// expectedOwnedStreamCount int -// expectedNotOwnedStreamCount int -// }{ -// "expected streams ownership to be recalculated": { -// featureEnabled: true, -// expectedOwnedStreamCount: 4, -// expectedNotOwnedStreamCount: 3, -// }, -// "expected streams ownership recalculation to be skipped": { -// featureEnabled: false, -// expectedOwnedStreamCount: 7, -// }, -// } -// for testName, testData := range tests { -// t.Run(testName, func(t *testing.T) { -// originalTokenGenerator := streamTokenGenerator -// t.Cleanup(func() { -// streamTokenGenerator = originalTokenGenerator -// }) -// streamTokenGenerator() -// mockRing := &readRingMock{ -// replicationSet: ring.ReplicationSet{ -// Instances: []ring.InstanceDesc{{Addr: "ingester-0", Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{100, 200, 300}}}, -// }, -// tokenRangesByIngester: map[string]ring.TokenRanges{ -// // this ingester owns token ranges [50, 100] and [200, 300] -// "ingester-0": {50, 100, 200, 300}, -// }, -// } -// -// limits, err := validation.NewOverrides(validation.Limits{ -// MaxGlobalStreamsPerUser: 100, -// UseOwnedStreamCount: testData.featureEnabled, -// }, nil) -// require.NoError(t, err) -// limiter := NewLimiter(limits, NilMetrics, mockRing, 1) -// -// tenant, err := newInstance( -// defaultConfig(), -// defaultPeriodConfigs, -// "tenant-a", -// limiter, -// runtime.DefaultTenantConfigs(), -// noopWAL{}, -// NilMetrics, -// nil, -// nil, -// nil, -// nil, -// NewStreamRateCalculator(), -// nil, -// nil, -// ) -// require.NoError(t, err) -// require.Equal(t, 100, tenant.ownedStreamsSvc.getFixedLimit(), "MaxGlobalStreamsPerUser is 100 at this moment") -// // not owned streams -// createStream(t, tenant, 49) -// createStream(t, tenant, 101) -// createStream(t, tenant, 301) -// -// // owned streams -// createStream(t, tenant, 50) -// createStream(t, tenant, 60) -// createStream(t, tenant, 100) -// createStream(t, tenant, 250) -// -// require.Equal(t, 7, tenant.ownedStreamsSvc.ownedStreamCount) -// require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, 0) -// -// mockTenantsSupplier := &mockTenantsSuplier{tenants: []*instance{tenant}} -// -// service := newRecalculateOwnedStreams(mockTenantsSupplier.get, "ingester-0", mockRing, 50*time.Millisecond, log.NewNopLogger()) -// //change the limit to assert that fixed limit is updated after the recalculation -// limits.DefaultLimits().MaxGlobalStreamsPerUser = 50 -// -// service.recalculate() -// -// if testData.featureEnabled { -// require.Equal(t, 50, tenant.ownedStreamsSvc.getFixedLimit(), "fixed limit must be updated after recalculation") -// } -// require.Equal(t, testData.expectedOwnedStreamCount, tenant.ownedStreamsSvc.ownedStreamCount) -// require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, testData.expectedNotOwnedStreamCount) -// }) -// } -// -//} +func Test_recalculateOwnedStreams_recalculate(t *testing.T) { + tests := map[string]struct { + featureEnabled bool + expectedOwnedStreamCount int + expectedNotOwnedStreamCount int + }{ + "expected streams ownership to be recalculated": { + featureEnabled: true, + expectedOwnedStreamCount: 4, + expectedNotOwnedStreamCount: 3, + }, + "expected streams ownership recalculation to be skipped": { + featureEnabled: false, + expectedOwnedStreamCount: 7, + }, + } + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + currentIngesterName := "ingester-0" + tenantName := "tenant-a" + + mockRing := &mockStreamsOwnershipRing{ + currentIngesterName: currentIngesterName, + tenantName: tenantName, + readRingMock: readRingMock{ + replicationSet: ring.ReplicationSet{ + Instances: []ring.InstanceDesc{{Addr: currentIngesterName, Timestamp: time.Now().UnixNano(), State: ring.ACTIVE, Tokens: []uint32{100, 200, 300}}}, + }, + }, + } + + limits, err := validation.NewOverrides(validation.Limits{ + MaxGlobalStreamsPerUser: 100, + UseOwnedStreamCount: testData.featureEnabled, + }, nil) + require.NoError(t, err) + limiter := NewLimiter(limits, NilMetrics, mockRing, 1) + + tenant, err := newInstance( + defaultConfig(), + defaultPeriodConfigs, + tenantName, + limiter, + runtime.DefaultTenantConfigs(), + noopWAL{}, + NilMetrics, + nil, + nil, + nil, + nil, + NewStreamRateCalculator(), + nil, + nil, + ) + require.NoError(t, err) + require.Equal(t, 100, tenant.ownedStreamsSvc.getFixedLimit(), "MaxGlobalStreamsPerUser is 100 at this moment") + // not owned streams + mockRing.addMapping(createStream(t, tenant, 49), false) + mockRing.addMapping(createStream(t, tenant, 101), false) + mockRing.addMapping(createStream(t, tenant, 301), false) + + // owned streams + mockRing.addMapping(createStream(t, tenant, 50), true) + mockRing.addMapping(createStream(t, tenant, 60), true) + mockRing.addMapping(createStream(t, tenant, 100), true) + mockRing.addMapping(createStream(t, tenant, 250), true) + + require.Equal(t, 7, tenant.ownedStreamsSvc.ownedStreamCount) + require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, 0) + + mockTenantsSupplier := &mockTenantsSuplier{tenants: []*instance{tenant}} + + service := newRecalculateOwnedStreams(mockTenantsSupplier.get, currentIngesterName, mockRing, 50*time.Millisecond, log.NewNopLogger()) + //change the limit to assert that fixed limit is updated after the recalculation + limits.DefaultLimits().MaxGlobalStreamsPerUser = 50 + + service.recalculate() + + if testData.featureEnabled { + require.Equal(t, 50, tenant.ownedStreamsSvc.getFixedLimit(), "fixed limit must be updated after recalculation") + } + require.Equal(t, testData.expectedOwnedStreamCount, tenant.ownedStreamsSvc.ownedStreamCount) + require.Len(t, tenant.ownedStreamsSvc.notOwnedStreams, testData.expectedNotOwnedStreamCount) + }) + } + +} + +type mockStreamsOwnershipRing struct { + readRingMock + currentIngesterName string + tenantName string + streamMapping map[uint32]ring.ReplicationSet +} + +func (r *mockStreamsOwnershipRing) addMapping(stream *stream, owned bool) { + instanceDescs := make([]ring.InstanceDesc, 0, 3) + instanceDescs = append(instanceDescs, ring.InstanceDesc{Id: "ingester-444"}) + instanceDescs = append(instanceDescs, ring.InstanceDesc{Id: "ingester-555"}) + if owned { + instanceDescs = append(instanceDescs, ring.InstanceDesc{Id: r.currentIngesterName}) + } else { + instanceDescs = append(instanceDescs, ring.InstanceDesc{Id: "ingester-333"}) + } + if r.streamMapping == nil { + r.streamMapping = make(map[uint32]ring.ReplicationSet) + } + r.streamMapping[lokiring.TokenFor(r.tenantName, stream.labelsString)] = ring.ReplicationSet{ + Instances: instanceDescs, + } +} + +func (r *mockStreamsOwnershipRing) Get(streamToken uint32, _ ring.Operation, _ []ring.InstanceDesc, _ []string, _ []string) (ring.ReplicationSet, error) { + set, found := r.streamMapping[streamToken] + if !found { + return ring.ReplicationSet{}, fmt.Errorf("replication set mapping is not found for stream hash: %v", streamToken) + } + return set, nil +} func Test_recalculateOwnedStreams_checkRingForChanges(t *testing.T) { mockRing := &readRingMock{ @@ -143,14 +178,14 @@ func Test_recalculateOwnedStreams_checkRingForChanges(t *testing.T) { require.True(t, ringChanged) } -func createStream(t *testing.T, inst *instance, fingerprint int) { - lbls := labels.Labels{ - labels.Label{Name: "mock", Value: strconv.Itoa(fingerprint)}} +func createStream(t *testing.T, inst *instance, fingerprint int) *stream { + lbls := labels.Labels{labels.Label{Name: "mock", Value: strconv.Itoa(fingerprint)}} - _, _, err := inst.streams.LoadOrStoreNew(lbls.String(), func() (*stream, error) { + stream, _, err := inst.streams.LoadOrStoreNew(lbls.String(), func() (*stream, error) { return inst.createStreamByFP(lbls, model.Fingerprint(fingerprint)) }, nil) require.NoError(t, err) + return stream } type mockTenantsSuplier struct { From cd07ee03346a5311222d088cf7bb0589727ebe10 Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Wed, 26 Jun 2024 11:30:57 +0300 Subject: [PATCH 5/6] added histogram to observe the duration of ownership check and small cleanup Signed-off-by: Vladyslav Diachenko --- pkg/ingester/instance.go | 4 ++++ pkg/ingester/metrics.go | 11 ++++++++++- pkg/ingester/recalculate_owned_streams.go | 4 ---- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 5332622bd4b65..75b4c45f85969 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -1176,6 +1176,10 @@ func minTs(stream *logproto.Stream) model.Time { // For each stream, we check if the stream is owned by the ingester or not and increment/decrement the owned stream count. func (i *instance) updateOwnedStreams(ingesterRing ring.ReadRing, ingesterID string) error { + start := time.Now() + defer func() { + i.metrics.streamsOwnershipCheck.Observe(float64(time.Since(start).Milliseconds())) + }() var descsBuf = make([]ring.InstanceDesc, ingesterRing.ReplicationFactor()+1) var hostsBuf = make([]string, ingesterRing.ReplicationFactor()+1) var zoneBuf = make([]string, ingesterRing.ZonesCount()+1) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 756eba0ebea74..bb91ca2de6734 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -65,7 +65,8 @@ type ingesterMetrics struct { // Shutdown marker for ingester scale down shutdownMarker prometheus.Gauge - flushQueueLength prometheus.Gauge + flushQueueLength prometheus.Gauge + streamsOwnershipCheck prometheus.Histogram } // setRecoveryBytesInUse bounds the bytes reports to >= 0. @@ -293,5 +294,13 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges Name: "flush_queue_length", Help: "The total number of series pending in the flush queue.", }), + + streamsOwnershipCheck: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Name: "ingester_streams_ownership_check_duration_ms", + Help: "Distribution of streams ownership check durations in milliseconds.", + // 100ms to 5s. + Buckets: []float64{100, 250, 350, 500, 750, 1000, 1500, 2000, 5000}, + }), } } diff --git a/pkg/ingester/recalculate_owned_streams.go b/pkg/ingester/recalculate_owned_streams.go index 3d645862dee08..d3bf79d29f743 100644 --- a/pkg/ingester/recalculate_owned_streams.go +++ b/pkg/ingester/recalculate_owned_streams.go @@ -77,10 +77,6 @@ func (s *recalculateOwnedStreams) updateFixedLimitForAll() { } } -func (s *recalculateOwnedStreams) resetRingState() { - s.previousRing = ring.ReplicationSet{} -} - func (s *recalculateOwnedStreams) checkRingForChanges() (bool, error) { rs, err := s.ingestersRing.GetAllHealthy(ring.WriteNoExtend) if err != nil { From 8ef6982576bae3a2505b5c24bbd357a62992c55c Mon Sep 17 00:00:00 2001 From: Vladyslav Diachenko Date: Wed, 26 Jun 2024 18:39:00 +0300 Subject: [PATCH 6/6] fixed style Signed-off-by: Vladyslav Diachenko --- pkg/ingester/metrics.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index fc227f9d2c7c7..ad190285ccd08 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -65,9 +65,9 @@ type ingesterMetrics struct { // Shutdown marker for ingester scale down shutdownMarker prometheus.Gauge - flushQueueLength prometheus.Gauge + flushQueueLength prometheus.Gauge duplicateLogBytesTotal *prometheus.CounterVec - streamsOwnershipCheck prometheus.Histogram + streamsOwnershipCheck prometheus.Histogram } // setRecoveryBytesInUse bounds the bytes reports to >= 0.