Skip to content

Commit

Permalink
feat: Add a new enforced_labels limit (#15704)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Introduce the new experimental `enforced_labels` limit.
By default it is empty but when configured, it configures Loki to only accept push requests which streams have all enforced labels.
  • Loading branch information
DylanGuedes authored Jan 14, 2025
1 parent aec8e96 commit a2c8ec7
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 4 deletions.
6 changes: 6 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3941,6 +3941,12 @@ otlp_config:
# CLI flag: -limits.block-ingestion-status-code
[block_ingestion_status_code: <int> | default = 260]

# List of labels that must be present in the stream. If any of the labels are
# missing, the stream will be discarded. This flag configures it globally for
# all tenants. Experimental.
# CLI flag: -validation.enforced-labels
[enforced_labels: <list of strings> | default = []]

# The number of partitions a tenant's data should be sharded to when using kafka
# ingestion. Tenants are sharded across partitions using shuffle-sharding. 0
# disables shuffle sharding and tenant is sharded across all partitions.
Expand Down
31 changes: 31 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,16 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
continue
}

if missing, lbsMissing := d.missingEnforcedLabels(lbs, tenantID); missing {
err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID)
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.MissingEnforcedLabels, tenantID).Add(float64(len(stream.Entries)))
discardedBytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.MissingEnforcedLabels, tenantID).Add(float64(discardedBytes))
continue
}

n := 0
pushSize := 0
prevTs := stream.Entries[0].Timestamp
Expand Down Expand Up @@ -733,6 +743,27 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}
}

// missingEnforcedLabels returns true if the stream is missing any of the required labels.
//
// It also returns the first label that is missing if any (for the case of multiple labels missing).
func (d *Distributor) missingEnforcedLabels(lbs labels.Labels, tenantID string) (bool, []string) {
requiredLbs := d.validator.Limits.EnforcedLabels(tenantID)
if len(requiredLbs) == 0 {
// no enforced labels configured.
return false, []string{}
}

missingLbs := []string{}

for _, lb := range requiredLbs {
if !lbs.Has(lb) {
missingLbs = append(missingLbs, lb)
}
}

return len(missingLbs) > 0, missingLbs
}

func (d *Distributor) trackDiscardedData(
ctx context.Context,
req *logproto.PushRequest,
Expand Down
53 changes: 53 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,59 @@ func Test_IncrementTimestamp(t *testing.T) {
}
}

func Test_MissingEnforcedLabels(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)

limits.EnforcedLabels = []string{"app", "env"}

distributors, _ := prepare(t, 1, 5, limits, nil)

// request with all required labels.
lbs := labels.FromMap(map[string]string{"app": "foo", "env": "prod"})
missing, missingLabels := distributors[0].missingEnforcedLabels(lbs, "test")
assert.False(t, missing)
assert.Empty(t, missingLabels)

// request missing the `app` label.
lbs = labels.FromMap(map[string]string{"env": "prod"})
missing, missingLabels = distributors[0].missingEnforcedLabels(lbs, "test")
assert.True(t, missing)
assert.EqualValues(t, []string{"app"}, missingLabels)

// request missing all required labels.
lbs = labels.FromMap(map[string]string{"pod": "distributor-abc"})
missing, missingLabels = distributors[0].missingEnforcedLabels(lbs, "test")
assert.True(t, missing)
assert.EqualValues(t, []string{"app", "env"}, missingLabels)
}

func Test_PushWithEnforcedLabels(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)

// makeWriteRequest only contains a `{foo="bar"}` label.
req := makeWriteRequest(100, 100)
limits.EnforcedLabels = []string{"app", "env"}
distributors, _ := prepare(t, 1, 3, limits, nil)
// enforced labels configured, but all labels are missing.
_, err := distributors[0].Push(ctx, req)
require.Error(t, err)
expectedErr := httpgrpc.Errorf(http.StatusBadRequest, validation.MissingEnforcedLabelsErrorMsg, "app,env", "test")
require.EqualError(t, err, expectedErr.Error())

// enforced labels, but all labels are present.
req = makeWriteRequestWithLabels(100, 100, []string{`{app="foo", env="prod"}`}, false, false, false)
_, err = distributors[0].Push(ctx, req)
require.NoError(t, err)

// no enforced labels, so no errors.
limits.EnforcedLabels = []string{}
distributors, _ = prepare(t, 1, 3, limits, nil)
_, err = distributors[0].Push(ctx, req)
require.NoError(t, err)
}

func TestDistributorPushConcurrently(t *testing.T) {
limits := &validation.Limits{}
flagext.DefaultValues(limits)
Expand Down
1 change: 1 addition & 0 deletions pkg/distributor/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Limits interface {

BlockIngestionUntil(userID string) time.Time
BlockIngestionStatusCode(userID string) int
EnforcedLabels(userID string) []string

IngestionPartitionsTenantShardSize(userID string) int
}
2 changes: 2 additions & 0 deletions pkg/distributor/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type validationContext struct {

blockIngestionUntil time.Time
blockIngestionStatusCode int
enforcedLabels []string

userID string
}
Expand All @@ -80,6 +81,7 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val
maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID),
blockIngestionUntil: v.BlockIngestionUntil(userID),
blockIngestionStatusCode: v.BlockIngestionStatusCode(userID),
enforcedLabels: v.EnforcedLabels(userID),
}
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ type Limits struct {

BlockIngestionUntil dskit_flagext.Time `yaml:"block_ingestion_until" json:"block_ingestion_until"`
BlockIngestionStatusCode int `yaml:"block_ingestion_status_code" json:"block_ingestion_status_code"`
EnforcedLabels []string `yaml:"enforced_labels" json:"enforced_labels" category:"experimental"`

IngestionPartitionsTenantShardSize int `yaml:"ingestion_partitions_tenant_shard_size" json:"ingestion_partitions_tenant_shard_size" category:"experimental"`

Expand Down Expand Up @@ -445,6 +446,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {

f.Var(&l.BlockIngestionUntil, "limits.block-ingestion-until", "Block ingestion until the configured date. The time should be in RFC3339 format.")
f.IntVar(&l.BlockIngestionStatusCode, "limits.block-ingestion-status-code", defaultBlockedIngestionStatusCode, "HTTP status code to return when ingestion is blocked. If 200, the ingestion will be blocked without returning an error to the client. By Default, a custom status code (260) is returned to the client along with an error message.")
f.Var((*dskit_flagext.StringSlice)(&l.EnforcedLabels), "validation.enforced-labels", "List of labels that must be present in the stream. If any of the labels are missing, the stream will be discarded. This flag configures it globally for all tenants. Experimental.")

f.IntVar(&l.IngestionPartitionsTenantShardSize, "limits.ingestion-partition-tenant-shard-size", 0, "The number of partitions a tenant's data should be sharded to when using kafka ingestion. Tenants are sharded across partitions using shuffle-sharding. 0 disables shuffle sharding and tenant is sharded across all partitions.")

Expand Down Expand Up @@ -1111,6 +1113,10 @@ func (o *Overrides) BlockIngestionStatusCode(userID string) int {
return o.getOverridesForUser(userID).BlockIngestionStatusCode
}

func (o *Overrides) EnforcedLabels(userID string) []string {
return o.getOverridesForUser(userID).EnforcedLabels
}

func (o *Overrides) ShardAggregations(userID string) []string {
return o.getOverridesForUser(userID).ShardAggregations
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/validation/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ ruler_remote_write_headers:
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
},
},
{
Expand All @@ -245,7 +246,8 @@ ruler_remote_write_headers:
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
},
},
{
Expand All @@ -269,6 +271,7 @@ retention_stream:
// Rest from new defaults
RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}},
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
},
},
{
Expand All @@ -290,7 +293,8 @@ reject_old_samples: true
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
},
},
{
Expand All @@ -313,7 +317,8 @@ query_timeout: 5m
Selector: `{a="b"}`,
},
},
OTLPConfig: defaultOTLPConfig,
OTLPConfig: defaultOTLPConfig,
EnforcedLabels: []string{},
},
},
} {
Expand Down
2 changes: 2 additions & 0 deletions pkg/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ const (
StructuredMetadataTooManyErrorMsg = "stream '%s' has too many structured metadata labels: '%d', limit: '%d'. Please see `limits_config.max_structured_metadata_entries_count` or contact your Loki administrator to increase it."
BlockedIngestion = "blocked_ingestion"
BlockedIngestionErrorMsg = "ingestion blocked for user %s until '%s' with status code '%d'"
MissingEnforcedLabels = "missing_enforced_labels"
MissingEnforcedLabelsErrorMsg = "missing required labels %s for user %s"
)

type ErrStreamRateLimit struct {
Expand Down

0 comments on commit a2c8ec7

Please sign in to comment.