From 573a1848d1ba717d5ad59f97701fd0ffca6c291e Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Tue, 30 Jul 2024 15:34:56 -0600 Subject: [PATCH] chore: add service_name label earlier in the ingestion pipeline (#13702) --- pkg/distributor/distributor.go | 16 ---- pkg/distributor/distributor_test.go | 125 +--------------------------- pkg/distributor/validator.go | 7 ++ pkg/loghttp/push/push.go | 47 +++++++++-- pkg/loghttp/push/push_test.go | 61 +++++++++++++- 5 files changed, 109 insertions(+), 147 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index cf6e8c841ea82..cd9524a9168ec 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -61,8 +61,6 @@ const ( ringAutoForgetUnhealthyPeriods = 2 - labelServiceName = "service_name" - serviceUnknown = "unknown_service" levelLabel = "detected_level" logLevelDebug = "debug" logLevelInfo = "info" @@ -789,20 +787,6 @@ func (d *Distributor) parseStreamLabels(vContext validationContext, key string, return nil, "", 0, err } - // We do not want to count service_name added by us in the stream limit so adding it after validating original labels. - if !ls.Has(labelServiceName) && len(vContext.discoverServiceName) > 0 { - serviceName := serviceUnknown - for _, labelName := range vContext.discoverServiceName { - if labelVal := ls.Get(labelName); labelVal != "" { - serviceName = labelVal - break - } - } - - ls = labels.NewBuilder(ls).Set(labelServiceName, serviceName).Labels() - stream.Labels = ls.String() - } - lsHash := ls.Hash() d.labelCache.Add(key, labelData{ls, lsHash}) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index e0621aa9b9dff..c21b1e2561cd2 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -103,7 +103,6 @@ func TestDistributor(t *testing.T) { t.Run(fmt.Sprintf("[%d](lines=%v)", i, tc.lines), func(t *testing.T) { limits := &validation.Limits{} flagext.DefaultValues(limits) - limits.DiscoverServiceName = nil limits.IngestionRateMB = ingestionRateLimit limits.IngestionBurstSizeMB = ingestionRateLimit limits.MaxLineSize = fe.ByteSize(tc.maxLineSize) @@ -140,20 +139,17 @@ func TestDistributor(t *testing.T) { func Test_IncrementTimestamp(t *testing.T) { incrementingDisabled := &validation.Limits{} flagext.DefaultValues(incrementingDisabled) - incrementingDisabled.DiscoverServiceName = nil incrementingDisabled.RejectOldSamples = false incrementingDisabled.DiscoverLogLevels = false incrementingEnabled := &validation.Limits{} flagext.DefaultValues(incrementingEnabled) - incrementingEnabled.DiscoverServiceName = nil incrementingEnabled.RejectOldSamples = false incrementingEnabled.IncrementDuplicateTimestamp = true incrementingEnabled.DiscoverLogLevels = false defaultLimits := &validation.Limits{} flagext.DefaultValues(defaultLimits) - now := time.Now() defaultLimits.DiscoverLogLevels = false tests := map[string]struct { @@ -401,34 +397,6 @@ func Test_IncrementTimestamp(t *testing.T) { }, }, }, - "default limit adding service_name label": { - limits: defaultLimits, - push: &logproto.PushRequest{ - Streams: []logproto.Stream{ - { - Labels: "{job=\"foo\"}", - Entries: []logproto.Entry{ - {Timestamp: now.Add(-2 * time.Second), Line: "hey1"}, - {Timestamp: now.Add(-time.Second), Line: "hey2"}, - {Timestamp: now, Line: "hey3"}, - }, - }, - }, - }, - expectedPush: &logproto.PushRequest{ - Streams: []logproto.Stream{ - { - Labels: "{job=\"foo\", service_name=\"foo\"}", - Hash: 0x86ca305b6d86e8b0, - Entries: []logproto.Entry{ - {Timestamp: now.Add(-2 * time.Second), Line: "hey1"}, - {Timestamp: now.Add(-time.Second), Line: "hey2"}, - {Timestamp: now, Line: "hey3"}, - }, - }, - }, - }, - }, } for testName, testData := range tests { @@ -448,7 +416,6 @@ func Test_IncrementTimestamp(t *testing.T) { func TestDistributorPushConcurrently(t *testing.T) { limits := &validation.Limits{} flagext.DefaultValues(limits) - limits.DiscoverServiceName = nil distributors, ingesters := prepare(t, 1, 5, limits, nil) @@ -552,20 +519,6 @@ func Test_SortLabelsOnPush(t *testing.T) { topVal := ingester.Peek() require.Equal(t, `{a="b", buzz="f", service_name="foo"}`, topVal.Streams[0].Labels) }) - - t.Run("with service_name added during ingestion", func(t *testing.T) { - limits := &validation.Limits{} - flagext.DefaultValues(limits) - ingester := &mockIngester{} - distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) - - request := makeWriteRequest(10, 10) - request.Streams[0].Labels = `{buzz="f", x="y", a="b"}` - _, err := distributors[0].Push(ctx, request) - require.NoError(t, err) - topVal := ingester.Peek() - require.Equal(t, `{a="b", buzz="f", service_name="unknown_service", x="y"}`, topVal.Streams[0].Labels) - }) } func Test_TruncateLogLines(t *testing.T) { @@ -603,7 +556,7 @@ func Test_DiscardEmptyStreamsAfterValidation(t *testing.T) { distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) _, err := distributors[0].Push(ctx, makeWriteRequest(1, 10)) - require.Equal(t, err, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf(validation.LineTooLongErrorMsg, 5, "{foo=\"bar\", service_name=\"unknown_service\"}", 10))) + require.Equal(t, err, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf(validation.LineTooLongErrorMsg, 5, "{foo=\"bar\"}", 10))) topVal := ingester.Peek() require.Nil(t, topVal) }) @@ -885,53 +838,9 @@ func TestParseStreamLabels(t *testing.T) { expectedErr error generateLimits func() *validation.Limits }{ - { - name: "service name label mapping disabled", - generateLimits: func() *validation.Limits { - limits := &validation.Limits{} - flagext.DefaultValues(limits) - limits.DiscoverServiceName = nil - return limits - }, - origLabels: `{foo="bar"}`, - expectedLabels: labels.Labels{ - { - Name: "foo", - Value: "bar", - }, - }, - }, - { - name: "no labels defined - service name label mapping disabled", - generateLimits: func() *validation.Limits { - limits := &validation.Limits{} - flagext.DefaultValues(limits) - limits.DiscoverServiceName = nil - return limits - }, - origLabels: `{}`, - expectedErr: fmt.Errorf(validation.MissingLabelsErrorMsg), - }, - { - name: "service name label enabled", - origLabels: `{foo="bar"}`, - generateLimits: func() *validation.Limits { - return defaultLimit - }, - expectedLabels: labels.Labels{ - { - Name: "foo", - Value: "bar", - }, - { - Name: labelServiceName, - Value: serviceUnknown, - }, - }, - }, { name: "service name label should not get counted against max labels count", - origLabels: `{foo="bar"}`, + origLabels: `{foo="bar", service_name="unknown_service"}`, generateLimits: func() *validation.Limits { limits := &validation.Limits{} flagext.DefaultValues(limits) @@ -944,33 +853,8 @@ func TestParseStreamLabels(t *testing.T) { Value: "bar", }, { - Name: labelServiceName, - Value: serviceUnknown, - }, - }, - }, - { - name: "use label service as service name", - origLabels: `{container="nginx", foo="bar", service="auth"}`, - generateLimits: func() *validation.Limits { - return defaultLimit - }, - expectedLabels: labels.Labels{ - { - Name: "container", - Value: "nginx", - }, - { - Name: "foo", - Value: "bar", - }, - { - Name: "service", - Value: "auth", - }, - { - Name: labelServiceName, - Value: "auth", + Name: loghttp_push.LabelServiceName, + Value: loghttp_push.ServiceUnknown, }, }, }, @@ -1562,7 +1446,6 @@ func Test_DetectLogLevels(t *testing.T) { flagext.DefaultValues(limits) limits.DiscoverLogLevels = discoverLogLevels - limits.DiscoverServiceName = nil limits.AllowStructuredMetadata = true return limits, &mockIngester{} } diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index 2ef4c78cff94a..b4f730a58a7fa 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -157,7 +157,14 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea validation.DiscardedSamples.WithLabelValues(validation.MissingLabels, ctx.userID).Inc() return fmt.Errorf(validation.MissingLabelsErrorMsg) } + numLabelNames := len(ls) + // This is a special case that's often added by the Loki infrastructure. It may result in allowing one extra label + // if incoming requests already have a service_name + if ls.Has(push.LabelServiceName) { + numLabelNames-- + } + if numLabelNames > ctx.maxLabelNamesPerSeries { updateMetrics(validation.MaxLabelNamesPerSeries, ctx.userID, stream) return fmt.Errorf(validation.MaxLabelNamesPerSeriesErrorMsg, stream.Labels, numLabelNames, ctx.maxLabelNamesPerSeries) diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index c63b32c6111bb..a9b174952f286 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -10,6 +10,8 @@ import ( "net/http" "time" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/go-kit/log/level" "github.com/grafana/loki/pkg/push" @@ -25,7 +27,6 @@ import ( "github.com/grafana/loki/v3/pkg/analytics" "github.com/grafana/loki/v3/pkg/loghttp" "github.com/grafana/loki/v3/pkg/logproto" - "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/util/unmarshal" @@ -57,7 +58,11 @@ var ( linesReceivedStats = analytics.NewCounter("distributor_lines_received") ) -const applicationJSON = "application/json" +const ( + applicationJSON = "application/json" + LabelServiceName = "service_name" + ServiceUnknown = "unknown_service" +) type TenantsRetention interface { RetentionPeriodFor(userID string, lbs labels.Labels) time.Duration @@ -65,6 +70,7 @@ type TenantsRetention interface { type Limits interface { OTLPConfig(userID string) OTLPConfig + DiscoverServiceName(userID string) []string } type EmptyLimits struct{} @@ -73,6 +79,10 @@ func (EmptyLimits) OTLPConfig(string) OTLPConfig { return DefaultOTLPConfig(GlobalOTLPConfig{}) } +func (EmptyLimits) DiscoverServiceName(string) []string { + return nil +} + type RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) type RequestParserWrapper func(inner RequestParser) RequestParser @@ -148,7 +158,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete return req, nil } -func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, _ Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) { +func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) { // Body var body io.Reader // bodySize should always reflect the compressed size of the request body @@ -217,16 +227,33 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe pushStats.ContentType = contentType pushStats.ContentEncoding = contentEncoding - for _, s := range req.Streams { + discoverServiceName := limits.DiscoverServiceName(userID) + for i := range req.Streams { + s := req.Streams[i] pushStats.StreamLabelsSize += int64(len(s.Labels)) - var lbs labels.Labels - if tenantsRetention != nil || tracker != nil { - lbs, err = syntax.ParseLabels(s.Labels) - if err != nil { - return nil, nil, fmt.Errorf("couldn't parse labels: %w", err) + lbs, err := syntax.ParseLabels(s.Labels) + if err != nil { + return nil, nil, fmt.Errorf("couldn't parse labels: %w", err) + } + + if !lbs.Has(LabelServiceName) && len(discoverServiceName) > 0 { + serviceName := ServiceUnknown + for _, labelName := range discoverServiceName { + if labelVal := lbs.Get(labelName); labelVal != "" { + serviceName = labelVal + break + } } + + lb := labels.NewBuilder(lbs) + lbs = lb.Set(LabelServiceName, serviceName).Labels() + s.Labels = lbs.String() + + // Remove the added label after it's added to the stream so it's not consumed by subsequent steps + lbs = lb.Del(LabelServiceName).Labels() } + var retentionPeriod time.Duration if tenantsRetention != nil { retentionPeriod = tenantsRetention.RetentionPeriodFor(userID, lbs) @@ -249,6 +276,8 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe pushStats.MostRecentEntryTimestamp = e.Timestamp } } + + req.Streams[i] = s } return &req, pushStats, nil diff --git a/pkg/loghttp/push/push_test.go b/pkg/loghttp/push/push_test.go index ac83492d62eba..0484afe31c3b0 100644 --- a/pkg/loghttp/push/push_test.go +++ b/pkg/loghttp/push/push_test.go @@ -54,10 +54,12 @@ func TestParseRequest(t *testing.T) { contentType string contentEncoding string valid bool + enableServiceDiscovery bool expectedStructuredMetadataBytes int expectedBytes int expectedLines int expectedBytesUsageTracker map[string]float64 + expectedLabels labels.Labels }{ { path: `/loki/api/v1/push`, @@ -79,6 +81,7 @@ func TestParseRequest(t *testing.T) { expectedBytes: len("fizzbuzz"), expectedLines: 1, expectedBytesUsageTracker: map[string]float64{`{foo="bar2"}`: float64(len("fizzbuss"))}, + expectedLabels: labels.FromStrings("foo", "bar2"), }, { path: `/loki/api/v1/push`, @@ -89,6 +92,7 @@ func TestParseRequest(t *testing.T) { expectedBytes: len("fizzbuzz"), expectedLines: 1, expectedBytesUsageTracker: map[string]float64{`{foo="bar2"}`: float64(len("fizzbuss"))}, + expectedLabels: labels.FromStrings("foo", "bar2"), }, { path: `/loki/api/v1/push`, @@ -106,6 +110,7 @@ func TestParseRequest(t *testing.T) { expectedBytes: len("fizzbuzz"), expectedLines: 1, expectedBytesUsageTracker: map[string]float64{`{foo="bar2"}`: float64(len("fizzbuss"))}, + expectedLabels: labels.FromStrings("foo", "bar2"), }, { path: `/loki/api/v1/push`, @@ -116,6 +121,7 @@ func TestParseRequest(t *testing.T) { expectedBytes: len("fizzbuzz"), expectedLines: 1, expectedBytesUsageTracker: map[string]float64{`{foo="bar2"}`: float64(len("fizzbuss"))}, + expectedLabels: labels.FromStrings("foo", "bar2"), }, { path: `/loki/api/v1/push`, @@ -133,6 +139,7 @@ func TestParseRequest(t *testing.T) { expectedBytes: len("fizzbuzz"), expectedLines: 1, expectedBytesUsageTracker: map[string]float64{`{foo="bar2"}`: float64(len("fizzbuss"))}, + expectedLabels: labels.FromStrings("foo", "bar2"), }, { path: `/loki/api/v1/push`, @@ -143,6 +150,7 @@ func TestParseRequest(t *testing.T) { expectedBytes: len("fizzbuzz"), expectedLines: 1, expectedBytesUsageTracker: map[string]float64{`{foo="bar2"}`: float64(len("fizzbuss"))}, + expectedLabels: labels.FromStrings("foo", "bar2"), }, { path: `/loki/api/v1/push`, @@ -196,6 +204,29 @@ func TestParseRequest(t *testing.T) { expectedBytes: len("fizzbuzz") + 2*len("a") + 2*len("b"), expectedLines: 1, expectedBytesUsageTracker: map[string]float64{`{foo="bar2"}`: float64(len("fizzbuzz") + 2*len("a") + 2*len("b"))}, + expectedLabels: labels.FromStrings("foo", "bar2"), + }, + { + path: `/loki/api/v1/push`, + body: `{"streams": [{ "stream": { "foo": "bar2", "job": "stuff" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`, + contentType: `application/json`, + valid: true, + enableServiceDiscovery: true, + expectedBytes: len("fizzbuzz"), + expectedLines: 1, + expectedBytesUsageTracker: map[string]float64{`{foo="bar2", job="stuff"}`: float64(len("fizzbuss"))}, + expectedLabels: labels.FromStrings("foo", "bar2", "job", "stuff", LabelServiceName, "stuff"), + }, + { + path: `/loki/api/v1/push`, + body: `{"streams": [{ "stream": { "foo": "bar2" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`, + contentType: `application/json`, + valid: true, + enableServiceDiscovery: true, + expectedBytes: len("fizzbuzz"), + expectedLines: 1, + expectedBytesUsageTracker: map[string]float64{`{foo="bar2"}`: float64(len("fizzbuss"))}, + expectedLabels: labels.FromStrings("foo", "bar2", LabelServiceName, ServiceUnknown), }, } { t.Run(fmt.Sprintf("test %d", index), func(t *testing.T) { @@ -212,7 +243,7 @@ func TestParseRequest(t *testing.T) { } tracker := NewMockTracker() - data, err := ParseRequest(util_log.Logger, "fake", request, nil, nil, ParseLokiRequest, tracker) + data, err := ParseRequest(util_log.Logger, "fake", request, nil, &fakeLimits{test.enableServiceDiscovery}, ParseLokiRequest, tracker) structuredMetadataBytesReceived := int(structuredMetadataBytesReceivedStats.Value()["total"].(int64)) - previousStructuredMetadataBytesReceived previousStructuredMetadataBytesReceived += structuredMetadataBytesReceived @@ -231,6 +262,7 @@ func TestParseRequest(t *testing.T) { require.Equal(t, float64(test.expectedStructuredMetadataBytes), testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", ""))) require.Equal(t, float64(test.expectedBytes), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", ""))) require.Equal(t, float64(test.expectedLines), testutil.ToFloat64(linesIngested.WithLabelValues("fake"))) + require.Equal(t, test.expectedLabels.String(), data.Streams[0].Labels) require.InDeltaMapValuesf(t, test.expectedBytesUsageTracker, tracker.receivedBytes, 0.0, "%s != %s", test.expectedBytesUsageTracker, tracker.receivedBytes) } else { assert.Errorf(t, err, "Should give error for %d", index) @@ -246,6 +278,33 @@ func TestParseRequest(t *testing.T) { } } +type fakeLimits struct { + enabled bool +} + +func (l *fakeLimits) OTLPConfig(_ string) OTLPConfig { + return OTLPConfig{} +} + +func (l *fakeLimits) DiscoverServiceName(_ string) []string { + if !l.enabled { + return nil + } + + return []string{ + "service", + "app", + "application", + "name", + "app_kubernetes_io_name", + "container", + "container_name", + "component", + "workload", + "job", + } +} + type MockCustomTracker struct { receivedBytes map[string]float64 discardedBytes map[string]float64